/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.ignite.internal.table.distributed.replicator;

import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.stream.Collectors.toMap;
import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import static org.apache.ignite.internal.lang.IgniteSystemProperties.getBoolean;
import static org.apache.ignite.internal.partition.replicator.network.replication.RequestType.RO_GET;
import static org.apache.ignite.internal.partition.replicator.network.replication.RequestType.RO_GET_ALL;
import static org.apache.ignite.internal.partition.replicator.network.replication.RequestType.RW_GET;
import static org.apache.ignite.internal.partition.replicator.network.replication.RequestType.RW_GET_ALL;
import static org.apache.ignite.internal.partition.replicator.network.replication.RequestType.RW_REPLACE;
import static org.apache.ignite.internal.partition.replicator.network.replication.RequestType.RW_SCAN;
import static org.apache.ignite.internal.partitiondistribution.Assignments.fromBytes;
import static org.apache.ignite.internal.raft.PeersAndLearners.fromAssignments;
import static org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toReplicationGroupIdMessage;
import static org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toTablePartitionIdMessage;
import static org.apache.ignite.internal.table.distributed.replicator.RemoteResourceIds.cursorId;
import static org.apache.ignite.internal.tx.TransactionIds.beginTimestamp;
import static org.apache.ignite.internal.tx.TxState.ABORTED;
import static org.apache.ignite.internal.tx.TxState.COMMITTED;
import static org.apache.ignite.internal.tx.TxState.FINISHING;
import static org.apache.ignite.internal.tx.TxState.PENDING;
import static org.apache.ignite.internal.tx.TxState.isFinalState;
import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
import static org.apache.ignite.internal.util.CompletableFutures.allOfToList;
import static org.apache.ignite.internal.util.CompletableFutures.emptyCollectionCompletedFuture;
import static org.apache.ignite.internal.util.CompletableFutures.emptyListCompletedFuture;
import static org.apache.ignite.internal.util.CompletableFutures.isCompletedSuccessfully;
import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
import static org.apache.ignite.internal.util.IgniteUtils.findAny;
import static org.apache.ignite.internal.util.IgniteUtils.findFirst;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
import static org.apache.ignite.lang.ErrorGroups.Replicator.CURSOR_CLOSE_ERR;
import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_ALREADY_FINISHED_ERR;
import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_ALREADY_FINISHED_WITH_TIMEOUT_ERR;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.apache.ignite.internal.binarytuple.BinaryTupleCommon;
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.catalog.events.CatalogEvent;
import org.apache.ignite.internal.components.NodeProperties;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.IgniteBiTuple;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.IgniteSystemProperties;
import org.apache.ignite.internal.lang.IgniteTriFunction;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.lowwatermark.LowWatermark;
import org.apache.ignite.internal.network.ClusterNodeResolver;
import org.apache.ignite.internal.partition.replicator.FuturesCleanupResult;
import org.apache.ignite.internal.partition.replicator.ReliableCatalogVersions;
import org.apache.ignite.internal.partition.replicator.ReplicaPrimacy;
import org.apache.ignite.internal.partition.replicator.ReplicaPrimacyEngine;
import org.apache.ignite.internal.partition.replicator.ReplicaTableProcessor;
import org.apache.ignite.internal.partition.replicator.ReplicaTxFinishMarker;
import org.apache.ignite.internal.partition.replicator.ReplicationRaftCommandApplicator;
import org.apache.ignite.internal.partition.replicator.TableAwareReplicaRequestPreProcessor;
import org.apache.ignite.internal.partition.replicator.TxRecoveryEngine;
import org.apache.ignite.internal.partition.replicator.handlers.MinimumActiveTxTimeReplicaRequestHandler;
import org.apache.ignite.internal.partition.replicator.handlers.TxCleanupRecoveryRequestHandler;
import org.apache.ignite.internal.partition.replicator.handlers.TxFinishReplicaRequestHandler;
import org.apache.ignite.internal.partition.replicator.handlers.TxRecoveryMessageHandler;
import org.apache.ignite.internal.partition.replicator.handlers.TxStateCommitPartitionReplicaRequestHandler;
import org.apache.ignite.internal.partition.replicator.handlers.VacuumTxStateReplicaRequestHandler;
import org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
import org.apache.ignite.internal.partition.replicator.network.TimedBinaryRow;
import org.apache.ignite.internal.partition.replicator.network.command.TimedBinaryRowMessage;
import org.apache.ignite.internal.partition.replicator.network.command.TimedBinaryRowMessageBuilder;
import org.apache.ignite.internal.partition.replicator.network.command.UpdateAllCommand;
import org.apache.ignite.internal.partition.replicator.network.command.UpdateCommand;
import org.apache.ignite.internal.partition.replicator.network.command.UpdateCommandV2Builder;
import org.apache.ignite.internal.partition.replicator.network.command.WriteIntentSwitchCommand;
import org.apache.ignite.internal.partition.replicator.network.replication.BinaryRowMessage;
import org.apache.ignite.internal.partition.replicator.network.replication.BinaryTupleMessage;
import org.apache.ignite.internal.partition.replicator.network.replication.BuildIndexReplicaRequest;
import org.apache.ignite.internal.partition.replicator.network.replication.ChangePeersAndLearnersAsyncReplicaRequest;
import org.apache.ignite.internal.partition.replicator.network.replication.GetEstimatedSizeRequest;
import org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyDirectMultiRowReplicaRequest;
import org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyDirectSingleRowReplicaRequest;
import org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyMultiRowPkReplicaRequest;
import org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyReplicaRequest;
import org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyScanRetrieveBatchReplicaRequest;
import org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlySingleRowPkReplicaRequest;
import org.apache.ignite.internal.partition.replicator.network.replication.ReadWriteMultiRowPkReplicaRequest;
import org.apache.ignite.internal.partition.replicator.network.replication.ReadWriteMultiRowReplicaRequest;
import org.apache.ignite.internal.partition.replicator.network.replication.ReadWriteReplicaRequest;
import org.apache.ignite.internal.partition.replicator.network.replication.ReadWriteScanRetrieveBatchReplicaRequest;
import org.apache.ignite.internal.partition.replicator.network.replication.ReadWriteSingleRowPkReplicaRequest;
import org.apache.ignite.internal.partition.replicator.network.replication.ReadWriteSingleRowReplicaRequest;
import org.apache.ignite.internal.partition.replicator.network.replication.ReadWriteSwapRowReplicaRequest;
import org.apache.ignite.internal.partition.replicator.network.replication.RequestType;
import org.apache.ignite.internal.partition.replicator.network.replication.ScanCloseReplicaRequest;
import org.apache.ignite.internal.partition.replicator.network.replication.UpdateMinimumActiveTxBeginTimeReplicaRequest;
import org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource;
import org.apache.ignite.internal.partition.replicator.schemacompat.IncompatibleSchemaVersionException;
import org.apache.ignite.internal.partition.replicator.schemacompat.SchemaCompatibilityValidator;
import org.apache.ignite.internal.partitiondistribution.Assignments;
import org.apache.ignite.internal.placementdriver.LeasePlacementDriver;
import org.apache.ignite.internal.raft.Command;
import org.apache.ignite.internal.raft.ExecutorInclinedRaftCommandRunner;
import org.apache.ignite.internal.raft.PeersAndLearners;
import org.apache.ignite.internal.raft.service.LeaderWithTerm;
import org.apache.ignite.internal.raft.service.RaftCommandRunner;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.replicator.CommandApplicationResult;
import org.apache.ignite.internal.replicator.PartitionGroupId;
import org.apache.ignite.internal.replicator.ReplicaResult;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.ReplicatorRecoverableExceptions;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException;
import org.apache.ignite.internal.replicator.exception.ReplicationException;
import org.apache.ignite.internal.replicator.exception.UnsupportedReplicaRequestException;
import org.apache.ignite.internal.replicator.listener.ReplicaListener;
import org.apache.ignite.internal.replicator.message.ReadOnlyDirectReplicaRequest;
import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
import org.apache.ignite.internal.replicator.message.ReplicaSafeTimeSyncRequest;
import org.apache.ignite.internal.replicator.message.ReplicationGroupIdMessage;
import org.apache.ignite.internal.replicator.message.SchemaVersionAwareReplicaRequest;
import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowUpgrader;
import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.schema.BinaryTupleComparator;
import org.apache.ignite.internal.schema.BinaryTuplePrefix;
import org.apache.ignite.internal.schema.NullBinaryRow;
import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.schema.SchemaSyncService;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.PartitionTimestampCursor;
import org.apache.ignite.internal.storage.ReadResult;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.index.IndexRow;
import org.apache.ignite.internal.storage.index.IndexRowImpl;
import org.apache.ignite.internal.storage.index.IndexStorage;
import org.apache.ignite.internal.storage.index.SortedIndexStorage;
import org.apache.ignite.internal.storage.util.StorageUtils;
import org.apache.ignite.internal.table.RowIdGenerator;
import org.apache.ignite.internal.table.distributed.IndexLocker;
import org.apache.ignite.internal.table.distributed.SortedIndexLocker;
import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
import org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
import org.apache.ignite.internal.table.distributed.TableUtils;
import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
import org.apache.ignite.internal.table.distributed.replicator.handlers.BuildIndexReplicaRequestHandler;
import org.apache.ignite.internal.tx.Lock;
import org.apache.ignite.internal.tx.LockKey;
import org.apache.ignite.internal.tx.LockManager;
import org.apache.ignite.internal.tx.LockMode;
import org.apache.ignite.internal.tx.PendingTxPartitionEnlistment;
import org.apache.ignite.internal.tx.TransactionMeta;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.tx.TxStateMeta;
import org.apache.ignite.internal.tx.UpdateCommandResult;
import org.apache.ignite.internal.tx.impl.FullyQualifiedResourceId;
import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
import org.apache.ignite.internal.tx.message.TableWriteIntentSwitchReplicaRequest;
import org.apache.ignite.internal.tx.message.TxCleanupRecoveryRequest;
import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
import org.apache.ignite.internal.tx.message.TxRecoveryMessage;
import org.apache.ignite.internal.tx.message.TxStateCommitPartitionRequest;
import org.apache.ignite.internal.tx.message.VacuumTxStateReplicaRequest;
import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicaRequest;
import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicaRequestBase;
import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicatedInfo;
import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.CursorUtils;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.Lazy;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.lang.ErrorGroups.Replicator;
import org.apache.ignite.lang.ErrorGroups.Transactions;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.tx.TransactionException;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;

/** Partition replication listener. */
public class PartitionReplicaListener implements ReplicaListener, ReplicaTableProcessor {
    /**
     * NB: this listener makes writes to the underlying MV partition storage without taking the partition snapshots read lock. This causes
     * the RAFT snapshots transferred to a follower being slightly inconsistent for a limited amount of time.
     *
     * <p>A RAFT snapshot of a partition consists of MV data, TX state data and metadata (which includes RAFT applied index).
     * Here, the 'slight' inconsistency is that MV data might be ahead of the snapshot meta (namely, RAFT applied index) and TX state data.
     *
     * <p>This listener by its nature cannot advance RAFT applied index (as it works out of the RAFT framework). This alone makes
     * the partition 'slightly inconsistent' in the same way as defined above. So, if we solve this inconsistency, we don't need to take the
     * partition snapshots read lock as well.
     *
     * <p>The inconsistency does not cause any real problems because it is further resolved.
     * <ul>
     *     <li>If the follower with a 'slightly' inconsistent partition state becomes a primary replica, this requires it to apply
     *     whole available RAFT log from the leader before actually becoming a primary; this application will remove the inconsistency</li>
     *     <li>If a node with this inconsistency is going to become a primary, and it's already the leader, then the above will not help.
     *     But write intent resolution procedure will close the gap.</li>
     *     <li>2 items above solve the inconsistency for RW transactions</li>
     *     <li>For RO reading from such a 'slightly inconsistent' partition, write intent resolution closes the gap as well.</li>
     * </ul>
     */
    @SuppressWarnings("unused") // We use it as a placeholder of a documentation which can be linked using # and @see.
    private static final Object INTERNAL_DOC_PLACEHOLDER = null;

    /** Logger. */
    private static final IgniteLogger LOG = Loggers.forClass(PartitionReplicaListener.class);

    /** Factory to create RAFT command messages. */
    private static final PartitionReplicationMessagesFactory PARTITION_REPLICATION_MESSAGES_FACTORY =
            new PartitionReplicationMessagesFactory();

    /** Factory for creating replica command messages. */
    private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new ReplicaMessagesFactory();

    /** Replication group id. */
    // TODO https://issues.apache.org/jira/browse/IGNITE-22522 Use ZonePartitionId instead.
    private final PartitionGroupId replicationGroupId;

    private final int tableId;

    // Despite the fact that it's correct to use replicationGroupId as lock key it's better to preserve slightly higher lock granularity.
    private final TablePartitionId tableLockKey;

    /** Primary key index. */
    private final Lazy<TableSchemaAwareIndexStorage> pkIndexStorage;

    /** Secondary indices. */
    private final Supplier<Map<Integer, TableSchemaAwareIndexStorage>> secondaryIndexStorages;

    /** Versioned partition storage. */
    private final MvPartitionStorage mvDataStorage;

    /** Raft client. */
    private final RaftCommandRunner raftCommandRunner;

    /** Tx manager. */
    private final TxManager txManager;

    /** Lock manager. */
    private final LockManager lockManager;

    /** Handler that processes updates writing them to storage. */
    private final StorageUpdateHandler storageUpdateHandler;

    /** Resources registry. */
    private final RemotelyTriggeredResourceRegistry remotelyTriggeredResourceRegistry;

    /** Clock service. */
    private final ClockService clockService;

    /** Safe time. */
    private final PendingComparableValuesTracker<HybridTimestamp, Void> safeTime;

    /** Transaction state resolver. */
    private final TransactionStateResolver transactionStateResolver;

    /** Runs async scan tasks for effective tail recursion execution (avoid deep recursive calls). */
    private final Executor scanRequestExecutor;

    private final Supplier<Map<Integer, IndexLocker>> indexesLockers;

    private final ConcurrentMap<UUID, TxCleanupReadyFutureList> txCleanupReadyFutures = new ConcurrentHashMap<>();

    /** Cleanup futures. */
    private final ConcurrentHashMap<RowId, CompletableFuture<?>> rowCleanupMap = new ConcurrentHashMap<>();

    private final SchemaCompatibilityValidator schemaCompatValidator;

    private final SchemaSyncService schemaSyncService;

    private final CatalogService catalogService;

    /** Busy lock to stop synchronously. */
    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();

    /** Prevents double stopping. */
    private final AtomicBoolean stopGuard = new AtomicBoolean();

    /**
     * Processor that handles catalog events {@link CatalogEvent#INDEX_BUILDING} and
     * tracks read-write transaction operations for building indexes.
     */
    private final PartitionReplicaBuildIndexProcessor indexBuildingProcessor;

    private final SchemaRegistry schemaRegistry;

    private final LowWatermark lowWatermark;

    private final NodeProperties nodeProperties;

    private static final boolean SKIP_UPDATES = getBoolean(IgniteSystemProperties.IGNITE_SKIP_STORAGE_UPDATE_IN_BENCHMARK);

    private final ReplicaPrimacyEngine replicaPrimacyEngine;
    private final TableAwareReplicaRequestPreProcessor tableAwareReplicaRequestPreProcessor;
    private final ReliableCatalogVersions reliableCatalogVersions;
    private final ReplicationRaftCommandApplicator raftCommandApplicator;
    private final ReplicaTxFinishMarker replicaTxFinishMarker;

    // Replica request handlers.
    private final TxFinishReplicaRequestHandler txFinishReplicaRequestHandler;
    private final TxStateCommitPartitionReplicaRequestHandler txStateCommitPartitionReplicaRequestHandler;
    private final TxRecoveryMessageHandler txRecoveryMessageHandler;
    private final TxCleanupRecoveryRequestHandler txCleanupRecoveryRequestHandler;
    private final MinimumActiveTxTimeReplicaRequestHandler minimumActiveTxTimeReplicaRequestHandler;
    private final VacuumTxStateReplicaRequestHandler vacuumTxStateReplicaRequestHandler;
    private final BuildIndexReplicaRequestHandler buildIndexReplicaRequestHandler;

    /**
     * The constructor.
     *
     * @param mvDataStorage Data storage.
     * @param raftCommandRunner Raft client.
     * @param txManager Transaction manager.
     * @param lockManager Lock manager.
     * @param replicationGroupId Replication group id.
     * @param tableId Table id.
     * @param indexesLockers Index lock helper objects.
     * @param pkIndexStorage Pk index storage.
     * @param secondaryIndexStorages Secondary index storages.
     * @param clockService Clock service.
     * @param safeTime Safe time clock.
     * @param txStatePartitionStorage Transaction state storage.
     * @param transactionStateResolver Transaction state resolver.
     * @param storageUpdateHandler Handler that processes updates writing them to storage.
     * @param localNode Instance of the local node.
     * @param catalogService Catalog service.
     * @param placementDriver Placement driver.
     * @param clusterNodeResolver Node resolver.
     * @param remotelyTriggeredResourceRegistry Resource registry.
     * @param indexMetaStorage Index meta storage.
     */
    public PartitionReplicaListener(
            MvPartitionStorage mvDataStorage,
            RaftCommandRunner raftCommandRunner,
            TxManager txManager,
            LockManager lockManager,
            Executor scanRequestExecutor,
            PartitionGroupId replicationGroupId,
            int tableId,
            Supplier<Map<Integer, IndexLocker>> indexesLockers,
            Lazy<TableSchemaAwareIndexStorage> pkIndexStorage,
            Supplier<Map<Integer, TableSchemaAwareIndexStorage>> secondaryIndexStorages,
            ClockService clockService,
            PendingComparableValuesTracker<HybridTimestamp, Void> safeTime,
            TxStatePartitionStorage txStatePartitionStorage,
            TransactionStateResolver transactionStateResolver,
            StorageUpdateHandler storageUpdateHandler,
            ValidationSchemasSource validationSchemasSource,
            ClusterNode localNode,
            SchemaSyncService schemaSyncService,
            CatalogService catalogService,
            LeasePlacementDriver placementDriver,
            ClusterNodeResolver clusterNodeResolver,
            RemotelyTriggeredResourceRegistry remotelyTriggeredResourceRegistry,
            SchemaRegistry schemaRegistry,
            IndexMetaStorage indexMetaStorage,
            LowWatermark lowWatermark,
            FailureProcessor failureProcessor,
            NodeProperties nodeProperties
    ) {
        this.mvDataStorage = mvDataStorage;
        this.raftCommandRunner = raftCommandRunner;
        this.txManager = txManager;
        this.lockManager = lockManager;
        this.scanRequestExecutor = scanRequestExecutor;
        this.indexesLockers = indexesLockers;
        this.pkIndexStorage = pkIndexStorage;
        this.secondaryIndexStorages = secondaryIndexStorages;
        this.clockService = clockService;
        this.safeTime = safeTime;
        this.transactionStateResolver = transactionStateResolver;
        this.storageUpdateHandler = storageUpdateHandler;
        this.schemaSyncService = schemaSyncService;
        this.catalogService = catalogService;
        this.remotelyTriggeredResourceRegistry = remotelyTriggeredResourceRegistry;
        this.schemaRegistry = schemaRegistry;
        this.lowWatermark = lowWatermark;
        this.nodeProperties = nodeProperties;
        this.replicationGroupId = replicationGroupId;
        this.tableId = tableId;
        this.tableLockKey = new TablePartitionId(tableId, replicationGroupId.partitionId());

        this.schemaCompatValidator = new SchemaCompatibilityValidator(validationSchemasSource, catalogService, schemaSyncService);

        indexBuildingProcessor = new PartitionReplicaBuildIndexProcessor(busyLock, tableId, indexMetaStorage, catalogService);

        replicaPrimacyEngine = new ReplicaPrimacyEngine(placementDriver, clockService, replicationGroupId, localNode);

        this.tableAwareReplicaRequestPreProcessor = new TableAwareReplicaRequestPreProcessor(
                clockService,
                schemaCompatValidator,
                schemaSyncService,
                nodeProperties
        );

        reliableCatalogVersions = new ReliableCatalogVersions(schemaSyncService, catalogService);
        raftCommandApplicator = new ReplicationRaftCommandApplicator(raftCommandRunner, replicationGroupId);
        replicaTxFinishMarker = new ReplicaTxFinishMarker(txManager);
        TxRecoveryEngine txRecoveryEngine = new TxRecoveryEngine(
                txManager,
                clusterNodeResolver,
                replicationGroupId,
                this::createAbandonedTxRecoveryEnlistment
        );

        txFinishReplicaRequestHandler = new TxFinishReplicaRequestHandler(
                txStatePartitionStorage,
                clockService,
                txManager,
                validationSchemasSource,
                schemaSyncService,
                catalogService,
                raftCommandRunner,
                replicationGroupId
        );

        txStateCommitPartitionReplicaRequestHandler = new TxStateCommitPartitionReplicaRequestHandler(
                txStatePartitionStorage,
                txManager,
                clusterNodeResolver,
                localNode,
                txRecoveryEngine
        );

        txRecoveryMessageHandler = new TxRecoveryMessageHandler(txStatePartitionStorage, replicationGroupId, txRecoveryEngine);

        txCleanupRecoveryRequestHandler = new TxCleanupRecoveryRequestHandler(
                txStatePartitionStorage,
                txManager,
                failureProcessor,
                replicationGroupId
        );

        minimumActiveTxTimeReplicaRequestHandler = new MinimumActiveTxTimeReplicaRequestHandler(
                clockService,
                raftCommandApplicator);

        vacuumTxStateReplicaRequestHandler = new VacuumTxStateReplicaRequestHandler(raftCommandApplicator);

        buildIndexReplicaRequestHandler = new BuildIndexReplicaRequestHandler(
                indexMetaStorage,
                indexBuildingProcessor.tracker(),
                safeTime,
                raftCommandApplicator);
    }

    // TODO https://issues.apache.org/jira/browse/IGNITE-22522 Remove.
    private PendingTxPartitionEnlistment createAbandonedTxRecoveryEnlistment(ClusterNode node) {
        assert !nodeProperties.colocationEnabled() : "Unexpected method call within colocation enabled.";
        // Enlistment consistency token is not required for the rollback, so it is 0L.
        // This method is not called in a colocation context, thus it's valid to cast replicationGroupId to TablePartitionId.
        return new PendingTxPartitionEnlistment(node.name(), 0L, ((TablePartitionId) replicationGroupId).tableId());
    }

    @Override
    public CompletableFuture<ReplicaResult> invoke(ReplicaRequest request, UUID senderId) {
        return replicaPrimacyEngine.validatePrimacy(request)
                .thenCompose(replicaPrimacy -> processRequestInContext(request, replicaPrimacy, senderId));
    }

    @Override
    public CompletableFuture<ReplicaResult> process(ReplicaRequest request, ReplicaPrimacy replicaPrimacy, UUID senderId) {
        return processRequestInContext(request, replicaPrimacy, senderId);
    }

    private CompletableFuture<ReplicaResult> processRequestInContext(
            ReplicaRequest request,
            ReplicaPrimacy replicaPrimacy,
            UUID senderId
    ) {
        return processRequest(request, replicaPrimacy, senderId)
                .thenApply(PartitionReplicaListener::wrapInReplicaResultIfNeeded);
    }

    private static ReplicaResult wrapInReplicaResultIfNeeded(Object res) {
        if (res instanceof ReplicaResult) {
            return (ReplicaResult) res;
        } else {
            return new ReplicaResult(res, null);
        }
    }

    /** Returns Raft-client. */
    @Override
    public RaftCommandRunner raftClient() {
        if (raftCommandRunner instanceof ExecutorInclinedRaftCommandRunner) {
            return ((ExecutorInclinedRaftCommandRunner) raftCommandRunner).decoratedCommandRunner();
        }
        return raftCommandRunner;
    }

    private CompletableFuture<?> processRequest(ReplicaRequest request, ReplicaPrimacy replicaPrimacy, UUID senderId) {
        boolean hasSchemaVersion = request instanceof SchemaVersionAwareReplicaRequest;

        if (hasSchemaVersion) {
            assert ((SchemaVersionAwareReplicaRequest) request).schemaVersion() > 0 : "No schema version passed?";
        }

        if (request instanceof ReadWriteReplicaRequest) {
            var req = (ReadWriteReplicaRequest) request;

            // Saving state is not needed for full transactions.
            if (!req.full()) {
                txManager.updateTxMeta(req.transactionId(), old -> new TxStateMeta(
                        PENDING,
                        req.coordinatorId(),
                        req.commitPartitionId().asReplicationGroupId(),
                        null,
                        old == null ? null : old.tx(),
                        old == null ? null : old.isFinishedDueToTimeout()
                ));
            }
        }

        if (request instanceof TxRecoveryMessage) {
            assert !nodeProperties.colocationEnabled() : "Unexpected method call within colocation enabled.";

            return txRecoveryMessageHandler.handle((TxRecoveryMessage) request, senderId);
        }

        if (request instanceof TxCleanupRecoveryRequest) {
            assert !nodeProperties.colocationEnabled() : "Unexpected method call within colocation enabled.";

            return txCleanupRecoveryRequestHandler.handle((TxCleanupRecoveryRequest) request);
        }

        if (request instanceof GetEstimatedSizeRequest) {
            return processGetEstimatedSizeRequest();
        }

        if (request instanceof ChangePeersAndLearnersAsyncReplicaRequest) {
            return processChangePeersAndLearnersReplicaRequest((ChangePeersAndLearnersAsyncReplicaRequest) request);
        }

        @Nullable HybridTimestamp opTs = tableAwareReplicaRequestPreProcessor.getOperationTimestamp(request);
        @Nullable HybridTimestamp opTsIfDirectRo = (request instanceof ReadOnlyDirectReplicaRequest) ? opTs : null;
        if (nodeProperties.colocationEnabled()) {
            return processOperationRequestWithTxOperationManagementLogic(senderId, request, replicaPrimacy, opTsIfDirectRo);
        } else {
            // Don't need to validate schema.
            if (opTs == null) {
                assert opTsIfDirectRo == null;
                return processOperationRequestWithTxOperationManagementLogic(senderId, request, replicaPrimacy, null);
            } else {
                return tableAwareReplicaRequestPreProcessor.preProcessTableAwareRequest(request, replicaPrimacy, senderId)
                        .thenCompose(ignored ->
                                processOperationRequestWithTxOperationManagementLogic(senderId, request, replicaPrimacy, opTsIfDirectRo));
            }
        }
    }

    private CompletableFuture<Long> processGetEstimatedSizeRequest() {
        return completedFuture(mvDataStorage.estimatedSize());
    }

    private CompletableFuture<Void> processChangePeersAndLearnersReplicaRequest(ChangePeersAndLearnersAsyncReplicaRequest request) {
        TablePartitionId replicaGrpId = (TablePartitionId) request.groupId().asReplicationGroupId();

        RaftGroupService raftClient = raftCommandRunner instanceof RaftGroupService
                ? (RaftGroupService) raftCommandRunner
                : ((RaftGroupService) ((ExecutorInclinedRaftCommandRunner) raftCommandRunner).decoratedCommandRunner());

        return raftClient.refreshAndGetLeaderWithTerm()
                .exceptionally(throwable -> {
                    throwable = unwrapCause(throwable);

                    if (throwable instanceof TimeoutException) {
                        LOG.info(
                                "Node couldn't get the leader within timeout so the changing peers is skipped [grp={}].",
                                replicaGrpId
                        );

                        return LeaderWithTerm.NO_LEADER;
                    }

                    throw new IgniteInternalException(
                            INTERNAL_ERR,
                            "Failed to get a leader for the RAFT replication group [get=" + replicaGrpId + "].",
                            throwable
                    );
                })
                .thenCompose(leaderWithTerm -> {
                    if (leaderWithTerm.isEmpty() || !replicaPrimacyEngine.tokenStillMatchesPrimary(request.enlistmentConsistencyToken())) {
                        return nullCompletedFuture();
                    }

                    // run update of raft configuration if this node is a leader
                    LOG.debug("Current node={} is the leader of partition raft group={}. "
                                    + "Initiate rebalance process for partition={}, table={}",
                            leaderWithTerm.leader(),
                            replicaGrpId,
                            replicaGrpId.partitionId(),
                            replicaGrpId.tableId()
                    );

                    return raftClient.changePeersAndLearnersAsync(peersConfigurationFromMessage(request), leaderWithTerm.term());
                });
    }

    private static PeersAndLearners peersConfigurationFromMessage(ChangePeersAndLearnersAsyncReplicaRequest request) {
        Assignments pendingAssignments = fromBytes(request.pendingAssignments());

        return fromAssignments(pendingAssignments.nodes());
    }

    private static void setDelayedAckProcessor(@Nullable ReplicaResult result, @Nullable BiConsumer<Object, Throwable> proc) {
        if (result != null) {
            result.delayedAckProcessor = proc;
        }
    }

    /**
     * Process operation request.
     *
     * @param senderId Sender id.
     * @param request Request.
     * @param replicaPrimacy Replica primacy information.
     * @param opStartTsIfDirectRo Start timestamp in case of direct RO tx.
     * @return Future.
     */
    private CompletableFuture<?> processOperationRequest(
            UUID senderId,
            ReplicaRequest request,
            ReplicaPrimacy replicaPrimacy,
            @Nullable HybridTimestamp opStartTsIfDirectRo
    ) {
        if (request instanceof ReadWriteSingleRowReplicaRequest) {
            var req = (ReadWriteSingleRowReplicaRequest) request;

            var opId = new OperationId(senderId, req.timestamp().longValue());

            return appendTxCommand(
                    req.transactionId(),
                    opId,
                    req.requestType(),
                    req.full(),
                    () -> processSingleEntryAction(req, replicaPrimacy.leaseStartTime()).whenComplete(
                            (r, e) -> setDelayedAckProcessor(r, req.delayedAckProcessor()))
            );
        } else if (request instanceof ReadWriteSingleRowPkReplicaRequest) {
            var req = (ReadWriteSingleRowPkReplicaRequest) request;

            var opId = new OperationId(senderId, req.timestamp().longValue());

            return appendTxCommand(
                    req.transactionId(),
                    opId,
                    req.requestType(),
                    req.full(),
                    () -> processSingleEntryAction(req, replicaPrimacy.leaseStartTime()).whenComplete(
                            (r, e) -> setDelayedAckProcessor(r, req.delayedAckProcessor()))
            );
        } else if (request instanceof ReadWriteMultiRowReplicaRequest) {
            var req = (ReadWriteMultiRowReplicaRequest) request;

            var opId = new OperationId(senderId, req.timestamp().longValue());

            return appendTxCommand(
                    req.transactionId(),
                    opId,
                    req.requestType(),
                    req.full(),
                    () -> processMultiEntryAction(req, replicaPrimacy.leaseStartTime()).whenComplete(
                            (r, e) -> setDelayedAckProcessor(r, req.delayedAckProcessor()))
            );
        } else if (request instanceof ReadWriteMultiRowPkReplicaRequest) {
            var req = (ReadWriteMultiRowPkReplicaRequest) request;

            var opId = new OperationId(senderId, req.timestamp().longValue());

            return appendTxCommand(
                    req.transactionId(),
                    opId,
                    req.requestType(),
                    req.full(),
                    () -> processMultiEntryAction(req, replicaPrimacy.leaseStartTime()).whenComplete(
                            (r, e) -> setDelayedAckProcessor(r, req.delayedAckProcessor()))
            );
        } else if (request instanceof ReadWriteSwapRowReplicaRequest) {
            var req = (ReadWriteSwapRowReplicaRequest) request;

            var opId = new OperationId(senderId, req.timestamp().longValue());

            return appendTxCommand(
                    req.transactionId(),
                    opId,
                    req.requestType(),
                    req.full(),
                    () -> processTwoEntriesAction(req, replicaPrimacy.leaseStartTime()).whenComplete(
                            (r, e) -> setDelayedAckProcessor(r, req.delayedAckProcessor()))
            );
        } else if (request instanceof ReadWriteScanRetrieveBatchReplicaRequest) {
            var req = (ReadWriteScanRetrieveBatchReplicaRequest) request;

            // Scan's request.full() has a slightly different semantics than the same field in other requests -
            // it identifies an implicit transaction. Please note that request.full() is always false in the following `appendTxCommand`.
            // We treat SCAN as 2pc and only switch to a 1pc mode if all table rows fit in the bucket and the transaction is implicit.
            // See `req.full() && (err != null || rows.size() < req.batchSize())` condition.
            // If they don't fit the bucket, the transaction is treated as 2pc.
            txManager.updateTxMeta(req.transactionId(), old -> new TxStateMeta(
                    PENDING,
                    req.coordinatorId(),
                    req.commitPartitionId().asReplicationGroupId(),
                    null,
                    old == null ? null : old.tx(),
                    old == null ? null : old.isFinishedDueToTimeout()
            ));

            var opId = new OperationId(senderId, req.timestamp().longValue());

            // Implicit RW scan can be committed locally on a last batch or error.
            return appendTxCommand(req.transactionId(), opId, RW_SCAN, false, () -> processScanRetrieveBatchAction(req))
                    .thenCompose(rows -> {
                        if (allElementsAreNull(rows)) {
                            return completedFuture(rows);
                        } else {
                            return validateRwReadAgainstSchemaAfterTakingLocks(req.transactionId())
                                    .thenApply(ignored -> rows);
                        }
                    })
                    .whenComplete((rows, err) -> {
                        if (req.full() && (err != null || rows.size() < req.batchSize())) {
                            releaseTxLocks(req.transactionId());
                        }
                    });
        } else if (request instanceof ScanCloseReplicaRequest) {
            processScanCloseAction((ScanCloseReplicaRequest) request);

            return nullCompletedFuture();
        } else if (request instanceof TxFinishReplicaRequest) {
            assert !nodeProperties.colocationEnabled() : request;

            return txFinishReplicaRequestHandler.handle((TxFinishReplicaRequest) request);
        } else if (request instanceof WriteIntentSwitchReplicaRequest) {
            return processWriteIntentSwitchAction((WriteIntentSwitchReplicaRequest) request);
        } else if (request instanceof TableWriteIntentSwitchReplicaRequest) {
            return processTableWriteIntentSwitchAction((TableWriteIntentSwitchReplicaRequest) request);
        } else if (request instanceof ReadOnlySingleRowPkReplicaRequest) {
            return processReadOnlySingleEntryAction((ReadOnlySingleRowPkReplicaRequest) request, replicaPrimacy.isPrimary());
        } else if (request instanceof ReadOnlyMultiRowPkReplicaRequest) {
            return processReadOnlyMultiEntryAction((ReadOnlyMultiRowPkReplicaRequest) request, replicaPrimacy.isPrimary());
        } else if (request instanceof ReadOnlyScanRetrieveBatchReplicaRequest) {
            return processReadOnlyScanRetrieveBatchAction((ReadOnlyScanRetrieveBatchReplicaRequest) request, replicaPrimacy.isPrimary());
        } else if (request instanceof ReplicaSafeTimeSyncRequest) {
            return processReplicaSafeTimeSyncRequest(replicaPrimacy.isPrimary());
        } else if (request instanceof BuildIndexReplicaRequest) {
            return buildIndexReplicaRequestHandler.handle((BuildIndexReplicaRequest) request);
        } else if (request instanceof ReadOnlyDirectSingleRowReplicaRequest) {
            return processReadOnlyDirectSingleEntryAction((ReadOnlyDirectSingleRowReplicaRequest) request, opStartTsIfDirectRo);
        } else if (request instanceof ReadOnlyDirectMultiRowReplicaRequest) {
            return processReadOnlyDirectMultiEntryAction((ReadOnlyDirectMultiRowReplicaRequest) request, opStartTsIfDirectRo);
        } else if (request instanceof TxStateCommitPartitionRequest) {
            assert !nodeProperties.colocationEnabled() : request;

            return txStateCommitPartitionReplicaRequestHandler.handle((TxStateCommitPartitionRequest) request);
        } else if (request instanceof VacuumTxStateReplicaRequest) {
            assert !nodeProperties.colocationEnabled() : request;

            return vacuumTxStateReplicaRequestHandler.handle((VacuumTxStateReplicaRequest) request);
        } else if (request instanceof UpdateMinimumActiveTxBeginTimeReplicaRequest) {
            assert !nodeProperties.colocationEnabled() : request;

            return minimumActiveTxTimeReplicaRequestHandler.handle((UpdateMinimumActiveTxBeginTimeReplicaRequest) request);
        }

        // Unknown request.
        throw new UnsupportedReplicaRequestException(request.getClass());
    }

    /**
     * Processes retrieve batch for read only transaction.
     *
     * @param request Read only retrieve batch request.
     * @param isPrimary Whether the given replica is primary.
     * @return Result future.
     */
    private CompletableFuture<List<BinaryRow>> processReadOnlyScanRetrieveBatchAction(
            ReadOnlyScanRetrieveBatchReplicaRequest request,
            boolean isPrimary
    ) {
        UUID txId = request.transactionId();
        int batchCount = request.batchSize();
        HybridTimestamp readTimestamp = request.readTimestamp();

        FullyQualifiedResourceId cursorId = cursorId(txId, request.scanId());

        CompletableFuture<Void> safeReadFuture = isPrimaryInTimestamp(isPrimary, readTimestamp) ? nullCompletedFuture()
                : safeTime.waitFor(readTimestamp);

        if (request.indexToUse() != null) {
            TableSchemaAwareIndexStorage indexStorage = secondaryIndexStorages.get().get(request.indexToUse());

            if (indexStorage == null) {
                throw new AssertionError("Index not found: uuid=" + request.indexToUse());
            }

            if (request.exactKey() != null) {
                assert request.lowerBoundPrefix() == null && request.upperBoundPrefix() == null : "Index lookup doesn't allow bounds.";

                return safeReadFuture.thenCompose(unused -> lookupIndex(request, indexStorage));
            }

            assert indexStorage.storage() instanceof SortedIndexStorage;

            return safeReadFuture.thenCompose(unused -> scanSortedIndex(request, indexStorage));
        }

        return safeReadFuture
                .thenCompose(
                        unused -> retrieveExactEntriesUntilCursorEmpty(txId, request.coordinatorId(), readTimestamp, cursorId, batchCount)
                );
    }

    /**
     * Extracts exact amount of entries, or less if cursor is become empty, from a cursor on the specific time.
     *
     * @param txId Transaction id is used for RW only.
     * @param txCoordinatorId Transaction coordinator id.
     * @param readTimestamp Timestamp of the moment when that moment when the data will be extracted.
     * @param cursorId Cursor id.
     * @param count Amount of entries which sill be extracted.
     * @return Result future.
     */
    private CompletableFuture<List<BinaryRow>> retrieveExactEntriesUntilCursorEmpty(
            UUID txId,
            UUID txCoordinatorId,
            @Nullable HybridTimestamp readTimestamp,
            FullyQualifiedResourceId cursorId,
            int count
    ) {
        PartitionTimestampCursor cursor =
                remotelyTriggeredResourceRegistry.<CursorResource>register(
                        cursorId,
                        txCoordinatorId,
                        () -> new CursorResource(
                                mvDataStorage.scan(readTimestamp == null ? HybridTimestamp.MAX_VALUE : readTimestamp)
                        )
                ).cursor();

        var resolutionFuts = new ArrayList<CompletableFuture<TimedBinaryRow>>(count);

        while (resolutionFuts.size() < count && cursor.hasNext()) {
            ReadResult readResult = cursor.next();
            HybridTimestamp newestCommitTimestamp = readResult.newestCommitTimestamp();

            TimedBinaryRow candidate;
            if (newestCommitTimestamp == null || !readResult.isWriteIntent()) {
                candidate = null;
            } else {
                BinaryRow committedRow = cursor.committed(newestCommitTimestamp);

                candidate = committedRow == null ? null : new TimedBinaryRow(committedRow, newestCommitTimestamp);
            }

            resolutionFuts.add(resolveReadResult(readResult, txId, readTimestamp, () -> candidate));
        }

        return allOf(resolutionFuts.toArray(new CompletableFuture[0])).thenCompose(unused -> {
            var rows = new ArrayList<BinaryRow>(count);

            for (CompletableFuture<TimedBinaryRow> resolutionFut : resolutionFuts) {
                TimedBinaryRow resolvedReadResult = resolutionFut.join();

                if (resolvedReadResult != null && resolvedReadResult.binaryRow() != null) {
                    rows.add(resolvedReadResult.binaryRow());
                }
            }

            if (rows.size() < count && cursor.hasNext()) {
                return retrieveExactEntriesUntilCursorEmpty(txId, txCoordinatorId, readTimestamp, cursorId, count - rows.size())
                        .thenApply(binaryRows -> {
                            rows.addAll(binaryRows);

                            return rows;
                        });
            } else {
                return completedFuture(closeCursorIfBatchNotFull(rows, count, cursorId));
            }
        });
    }

    /**
     * Extracts exact amount of entries, or less if cursor is become empty, from a cursor on the specific time. Use it for RW.
     *
     * @param txId Transaction id.
     * @param cursorId Cursor id.
     * @return Future finishes with the resolved binary row.
     */
    private CompletableFuture<List<BinaryRow>> retrieveExactEntriesUntilCursorEmpty(
            UUID txId,
            UUID txCoordinatorId,
            FullyQualifiedResourceId cursorId,
            int count
    ) {
        return retrieveExactEntriesUntilCursorEmpty(txId, txCoordinatorId, null, cursorId, count).thenCompose(rows -> {
            if (nullOrEmpty(rows)) {
                return emptyListCompletedFuture();
            }

            CompletableFuture<?>[] futs = new CompletableFuture[rows.size()];

            for (int i = 0; i < rows.size(); i++) {
                BinaryRow row = rows.get(i);

                futs[i] = validateBackwardCompatibility(row, txId)
                        .thenApply(unused -> row);
            }

            return allOf(futs).thenApply((unused) -> rows);
        });
    }

    private CompletableFuture<Void> validateBackwardCompatibility(BinaryRow row, UUID txId) {
        return schemaCompatValidator.validateBackwards(row.schemaVersion(), tableId(), txId)
                .thenAccept(validationResult -> {
                    if (!validationResult.isSuccessful()) {
                        throw new IncompatibleSchemaVersionException(String.format(
                                "Operation failed because it tried to access a row with newer schema version than transaction's [table=%s, "
                                        + "txSchemaVersion=%d, rowSchemaVersion=%d]",
                                validationResult.failedTableName(), validationResult.fromSchemaVersion(), validationResult.toSchemaVersion()
                        ));
                    }
                });
    }

    /**
     * Processes single entry request for read only transaction.
     *
     * @param request Read only single entry request.
     * @param isPrimary Whether the given replica is primary.
     * @return Result future.
     */
    private CompletableFuture<BinaryRow> processReadOnlySingleEntryAction(ReadOnlySingleRowPkReplicaRequest request, boolean isPrimary) {
        BinaryTuple primaryKey = resolvePk(request.primaryKey());
        HybridTimestamp readTimestamp = request.readTimestamp();

        if (request.requestType() != RO_GET) {
            throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
                    format("Unknown single request [actionType={}]", request.requestType()));
        }

        CompletableFuture<Void> safeReadFuture = isPrimaryInTimestamp(isPrimary, readTimestamp) ? nullCompletedFuture()
                : safeTime.waitFor(request.readTimestamp());

        return safeReadFuture.thenCompose(unused -> resolveRowByPkForReadOnly(primaryKey, readTimestamp));
    }

    /**
     * Checks that the node is primary and {@code timestamp} is already passed in the reference system of the current node.
     *
     * @param isPrimary True if the node is primary, false otherwise.
     * @param timestamp Timestamp to check.
     * @return True if the timestamp is already passed in the reference system of the current node and node is primary, false otherwise.
     */
    private boolean isPrimaryInTimestamp(boolean isPrimary, HybridTimestamp timestamp) {
        return isPrimary && clockService.now().compareTo(timestamp) > 0;
    }

    /**
     * Processes multiple entries request for read only transaction.
     *
     * @param request Read only multiple entries request.
     * @param isPrimary Whether the given replica is primary.
     * @return Result future.
     */
    private CompletableFuture<List<BinaryRow>> processReadOnlyMultiEntryAction(
            ReadOnlyMultiRowPkReplicaRequest request,
            boolean isPrimary
    ) {
        List<BinaryTuple> primaryKeys = resolvePks(request.primaryKeys());
        HybridTimestamp readTimestamp = request.readTimestamp();

        if (request.requestType() != RO_GET_ALL) {
            throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
                    format("Unknown single request [actionType={}]", request.requestType()));
        }

        CompletableFuture<Void> safeReadFuture = isPrimaryInTimestamp(isPrimary, readTimestamp) ? nullCompletedFuture()
                : safeTime.waitFor(request.readTimestamp());

        return safeReadFuture.thenCompose(unused -> {
            CompletableFuture<BinaryRow>[] resolutionFuts = new CompletableFuture[primaryKeys.size()];

            for (int i = 0; i < primaryKeys.size(); i++) {
                resolutionFuts[i] = resolveRowByPkForReadOnly(primaryKeys.get(i), readTimestamp);
            }

            return allOfToList(resolutionFuts);
        });
    }

    /**
     * Handler to process {@link ReplicaSafeTimeSyncRequest}.
     *
     * @param isPrimary Whether is primary replica.
     * @return Future.
     */
    private CompletableFuture<?> processReplicaSafeTimeSyncRequest(boolean isPrimary) {
        // Disable safe-time sync if the Colocation feature is enabled, safe-time is managed on a different level there.
        if (!isPrimary || nodeProperties.colocationEnabled()) {
            return nullCompletedFuture();
        }

        return applyCmdWithExceptionHandling(
                REPLICA_MESSAGES_FACTORY.safeTimeSyncCommand().initiatorTime(clockService.now()).build()
        );
    }

    /**
     * Processes scan close request.
     *
     * @param request Scan close request operation.
     */
    private void processScanCloseAction(ScanCloseReplicaRequest request) {
        UUID txId = request.transactionId();

        FullyQualifiedResourceId cursorId = cursorId(txId, request.scanId());

        try {
            remotelyTriggeredResourceRegistry.close(cursorId);
        } catch (IgniteException e) {
            throw wrapCursorCloseException(e);
        }
    }

    /**
     * Closes a cursor if the batch is not fully retrieved.
     *
     * @param batchSize Requested batch size.
     * @param rows List of retrieved batch items.
     * @param cursorId Cursor id.
     */
    private <T> ArrayList<T> closeCursorIfBatchNotFull(ArrayList<T> rows, int batchSize, FullyQualifiedResourceId cursorId) {
        if (rows.size() < batchSize) {
            try {
                remotelyTriggeredResourceRegistry.close(cursorId);
            } catch (IgniteException e) {
                throw wrapCursorCloseException(e);
            }
        }

        return rows;
    }

    private ReplicationException wrapCursorCloseException(IgniteException e) {
        return new ReplicationException(CURSOR_CLOSE_ERR,
                format("Close cursor exception [replicaGrpId={}, msg={}]", replicationGroupId, e.getMessage()), e);
    }

    /**
     * Processes scan retrieve batch request.
     *
     * @param request Scan retrieve batch request operation.
     * @return Listener response.
     */
    private CompletableFuture<List<BinaryRow>> processScanRetrieveBatchAction(ReadWriteScanRetrieveBatchReplicaRequest request) {
        if (request.indexToUse() != null) {
            TableSchemaAwareIndexStorage indexStorage = secondaryIndexStorages.get().get(request.indexToUse());

            if (indexStorage == null) {
                throw new AssertionError("Index not found: uuid=" + request.indexToUse());
            }

            if (request.exactKey() != null) {
                assert request.lowerBoundPrefix() == null && request.upperBoundPrefix() == null : "Index lookup doesn't allow bounds.";

                return lookupIndex(request, indexStorage.storage(), request.coordinatorId());
            }

            assert indexStorage.storage() instanceof SortedIndexStorage;

            return scanSortedIndex(request, indexStorage);
        }

        UUID txId = request.transactionId();
        int batchCount = request.batchSize();

        FullyQualifiedResourceId cursorId = cursorId(txId, request.scanId());

        return lockManager.acquire(txId, new LockKey(tableLockKey), LockMode.S)
                .thenCompose(tblLock -> retrieveExactEntriesUntilCursorEmpty(txId, request.coordinatorId(), cursorId, batchCount));
    }

    /**
     * Lookup sorted index in RO tx.
     *
     * @param request Index scan request.
     * @param schemaAwareIndexStorage Index storage.
     * @return Operation future.
     */
    private CompletableFuture<List<BinaryRow>> lookupIndex(
            ReadOnlyScanRetrieveBatchReplicaRequest request,
            TableSchemaAwareIndexStorage schemaAwareIndexStorage
    ) {
        IndexStorage indexStorage = schemaAwareIndexStorage.storage();

        FullyQualifiedResourceId cursorId = cursorId(request.transactionId(), request.scanId());

        BinaryTuple key = request.exactKey().asBinaryTuple();

        Cursor<RowId> cursor = remotelyTriggeredResourceRegistry.<CursorResource>register(
                cursorId,
                request.coordinatorId(),
                () -> new CursorResource(indexStorage.get(key))
        ).cursor();

        Cursor<IndexRow> indexRowCursor = CursorUtils.map(cursor, rowId -> new IndexRowImpl(key, rowId));

        int batchCount = request.batchSize();

        var result = new ArrayList<BinaryRow>(batchCount);

        HybridTimestamp readTimestamp = request.readTimestamp();

        return continueReadOnlyIndexScan(
                schemaAwareIndexStorage,
                indexRowCursor,
                readTimestamp,
                batchCount,
                result,
                tableVersionByTs(readTimestamp)
        ).thenApply(ignore -> closeCursorIfBatchNotFull(result, batchCount, cursorId));
    }

    private CompletableFuture<List<BinaryRow>> lookupIndex(
            ReadWriteScanRetrieveBatchReplicaRequest request,
            IndexStorage indexStorage,
            UUID txCoordinatorId
    ) {
        UUID txId = request.transactionId();
        int batchCount = request.batchSize();

        FullyQualifiedResourceId cursorId = cursorId(txId, request.scanId());

        Integer indexId = request.indexToUse();

        BinaryTuple exactKey = request.exactKey().asBinaryTuple();

        return lockManager.acquire(txId, new LockKey(indexId, exactKey.byteBuffer()), LockMode.S)
                .thenCompose(indRowLock -> { // Hash index bucket S lock
                    Cursor<RowId> cursor = remotelyTriggeredResourceRegistry.<CursorResource>register(cursorId, txCoordinatorId,
                            () -> new CursorResource(indexStorage.get(exactKey))).cursor();

                    var result = new ArrayList<BinaryRow>(batchCount);

                    return continueIndexLookup(txId, cursor, batchCount, result).thenApply(
                            ignore -> closeCursorIfBatchNotFull(result, batchCount, cursorId));
                });
    }

    /**
     * Scans sorted index in RW tx.
     *
     * @param request Index scan request.
     * @param schemaAwareIndexStorage Sorted index storage.
     * @return Operation future.
     */
    private CompletableFuture<List<BinaryRow>> scanSortedIndex(
            ReadWriteScanRetrieveBatchReplicaRequest request,
            TableSchemaAwareIndexStorage schemaAwareIndexStorage
    ) {
        var indexStorage = (SortedIndexStorage) schemaAwareIndexStorage.storage();

        UUID txId = request.transactionId();

        FullyQualifiedResourceId cursorId = cursorId(txId, request.scanId());

        Integer indexId = request.indexToUse();

        BinaryTupleMessage lowerBoundMessage = request.lowerBoundPrefix();
        BinaryTupleMessage upperBoundMessage = request.upperBoundPrefix();

        BinaryTuplePrefix lowerBound = lowerBoundMessage == null ? null : lowerBoundMessage.asBinaryTuplePrefix();
        BinaryTuplePrefix upperBound = upperBoundMessage == null ? null : upperBoundMessage.asBinaryTuplePrefix();

        int flags = request.flags();

        BinaryTupleComparator comparator = StorageUtils.binaryTupleComparator(indexStorage.indexDescriptor().columns());

        Predicate<IndexRow> isUpperBoundAchieved = indexRow -> {
            if (indexRow == null) {
                return true;
            }

            if (upperBound == null) {
                return false;
            }

            ByteBuffer buffer = upperBound.byteBuffer();

            if ((flags & SortedIndexStorage.LESS_OR_EQUAL) != 0) {
                byte boundFlags = buffer.get(0);

                buffer.put(0, (byte) (boundFlags | BinaryTupleCommon.EQUALITY_FLAG));
            }

            return comparator.compare(indexRow.indexColumns().byteBuffer(), buffer) >= 0;
        };

        Cursor<IndexRow> cursor = remotelyTriggeredResourceRegistry.<CursorResource>register(
                cursorId,
                request.coordinatorId(),
                () -> new CursorResource(indexStorage.scan(
                        lowerBound,
                        // We have to handle upperBound on a level of replication listener,
                        // for correctness of taking of a range lock.
                        null,
                        flags
                ))
        ).cursor();

        SortedIndexLocker indexLocker = (SortedIndexLocker) indexesLockers.get().get(indexId);

        int batchCount = request.batchSize();

        var result = new ArrayList<BinaryRow>(batchCount);

        return continueIndexScan(
                txId,
                schemaAwareIndexStorage,
                indexLocker,
                cursor,
                batchCount,
                result,
                isUpperBoundAchieved,
                tableVersionByTs(beginTimestamp(txId))
        ).thenApply(ignore -> closeCursorIfBatchNotFull(result, batchCount, cursorId));
    }

    /**
     * Scans sorted index in RO tx.
     *
     * @param request Index scan request.
     * @param schemaAwareIndexStorage Sorted index storage.
     * @return Operation future.
     */
    private CompletableFuture<List<BinaryRow>> scanSortedIndex(
            ReadOnlyScanRetrieveBatchReplicaRequest request,
            TableSchemaAwareIndexStorage schemaAwareIndexStorage
    ) {
        var indexStorage = (SortedIndexStorage) schemaAwareIndexStorage.storage();

        FullyQualifiedResourceId cursorId = cursorId(request.transactionId(), request.scanId());

        BinaryTupleMessage lowerBoundMessage = request.lowerBoundPrefix();
        BinaryTupleMessage upperBoundMessage = request.upperBoundPrefix();

        BinaryTuplePrefix lowerBound = lowerBoundMessage == null ? null : lowerBoundMessage.asBinaryTuplePrefix();
        BinaryTuplePrefix upperBound = upperBoundMessage == null ? null : upperBoundMessage.asBinaryTuplePrefix();

        int flags = request.flags();

        Cursor<IndexRow> cursor = remotelyTriggeredResourceRegistry.<CursorResource>register(cursorId, request.coordinatorId(),
                () -> new CursorResource(indexStorage.readOnlyScan(
                        lowerBound,
                        upperBound,
                        flags
                ))).cursor();

        int batchCount = request.batchSize();

        var result = new ArrayList<BinaryRow>(batchCount);

        HybridTimestamp readTimestamp = request.readTimestamp();

        return continueReadOnlyIndexScan(
                schemaAwareIndexStorage,
                cursor,
                readTimestamp,
                batchCount,
                result,
                tableVersionByTs(readTimestamp)
        ).thenApply(ignore -> closeCursorIfBatchNotFull(result, batchCount, cursorId));
    }

    private CompletableFuture<Void> continueReadOnlyIndexScan(
            TableSchemaAwareIndexStorage schemaAwareIndexStorage,
            Cursor<IndexRow> cursor,
            HybridTimestamp readTimestamp,
            int batchSize,
            List<BinaryRow> result,
            int tableVersion
    ) {
        if (result.size() >= batchSize || !cursor.hasNext()) {
            return nullCompletedFuture();
        }

        IndexRow indexRow = cursor.next();

        RowId rowId = indexRow.rowId();

        return resolvePlainReadResult(rowId, null, readTimestamp).thenComposeAsync(resolvedReadResult -> {
            BinaryRow binaryRow = upgrade(binaryRow(resolvedReadResult), tableVersion);

            if (binaryRow != null && indexRowMatches(indexRow, binaryRow, schemaAwareIndexStorage)) {
                result.add(binaryRow);
            }

            return continueReadOnlyIndexScan(schemaAwareIndexStorage, cursor, readTimestamp, batchSize, result, tableVersion);
        }, scanRequestExecutor);
    }

    /**
     * Index scan loop. Retrieves next row from index, takes locks, fetches associated data row and collects to the result.
     *
     * @param txId Transaction id.
     * @param schemaAwareIndexStorage Index storage.
     * @param indexLocker Index locker.
     * @param indexCursor Index cursor.
     * @param batchSize Batch size.
     * @param result Result collection.
     * @param isUpperBoundAchieved Function to stop on upper bound.
     * @param tableVersion Table schema version at begin timestamp.
     * @return Future.
     */
    private CompletableFuture<Void> continueIndexScan(
            UUID txId,
            TableSchemaAwareIndexStorage schemaAwareIndexStorage,
            SortedIndexLocker indexLocker,
            Cursor<IndexRow> indexCursor,
            int batchSize,
            List<BinaryRow> result,
            Predicate<IndexRow> isUpperBoundAchieved,
            int tableVersion
    ) {
        if (result.size() == batchSize) { // Batch is full, exit loop.
            return nullCompletedFuture();
        }

        return indexLocker.locksForScan(txId, indexCursor)
                .thenCompose(currentRow -> { // Index row S lock
                    if (isUpperBoundAchieved.test(currentRow)) {
                        return nullCompletedFuture(); // End of range reached. Exit loop.
                    }

                    RowId rowId = currentRow.rowId();

                    return lockManager.acquire(txId, new LockKey(tableLockKey, rowId), LockMode.S)
                            .thenComposeAsync(rowLock -> { // Table row S lock
                                return resolvePlainReadResult(rowId, txId).thenCompose(resolvedReadResult -> {
                                    BinaryRow binaryRow = upgrade(binaryRow(resolvedReadResult), tableVersion);

                                    if (binaryRow != null && indexRowMatches(currentRow, binaryRow, schemaAwareIndexStorage)) {
                                        result.add(resolvedReadResult.binaryRow());
                                    }

                                    // Proceed scan.
                                    return continueIndexScan(
                                            txId,
                                            schemaAwareIndexStorage,
                                            indexLocker,
                                            indexCursor,
                                            batchSize,
                                            result,
                                            isUpperBoundAchieved,
                                            tableVersion
                                    );
                                });
                            }, scanRequestExecutor);
                });
    }

    /**
     * Checks whether passed index row corresponds to the binary row.
     *
     * @param indexRow Index row, read from index storage.
     * @param binaryRow Binary row, read from MV storage.
     * @param schemaAwareIndexStorage Schema aware index storage, to resolve values of indexed columns in a binary row.
     * @return {@code true} if index row matches the binary row, {@code false} otherwise.
     */
    private static boolean indexRowMatches(IndexRow indexRow, BinaryRow binaryRow, TableSchemaAwareIndexStorage schemaAwareIndexStorage) {
        BinaryTuple actualIndexRow = schemaAwareIndexStorage.indexRowResolver().extractColumns(binaryRow);

        return indexRow.indexColumns().byteBuffer().equals(actualIndexRow.byteBuffer());
    }

    private CompletableFuture<Void> continueIndexLookup(
            UUID txId,
            Cursor<RowId> indexCursor,
            int batchSize,
            List<BinaryRow> result
    ) {
        if (result.size() >= batchSize || !indexCursor.hasNext()) {
            return nullCompletedFuture();
        }

        RowId rowId = indexCursor.next();

        return lockManager.acquire(txId, new LockKey(tableLockKey, rowId), LockMode.S)
                .thenComposeAsync(rowLock -> { // Table row S lock
                    return resolvePlainReadResult(rowId, txId).thenCompose(resolvedReadResult -> {
                        if (resolvedReadResult != null && resolvedReadResult.binaryRow() != null) {
                            result.add(resolvedReadResult.binaryRow());
                        }

                        // Proceed lookup.
                        return continueIndexLookup(txId, indexCursor, batchSize, result);
                    });
                }, scanRequestExecutor);
    }

    /**
     * Resolves a result received from a direct storage read.
     *
     * @param rowId Row id to resolve.
     * @param txId Transaction id is used for RW only.
     * @param timestamp Read timestamp.
     * @return Future finishes with the resolved binary row.
     */
    private CompletableFuture<@Nullable TimedBinaryRow> resolvePlainReadResult(
            RowId rowId,
            @Nullable UUID txId,
            @Nullable HybridTimestamp timestamp
    ) {
        ReadResult readResult = mvDataStorage.read(rowId, timestamp == null ? HybridTimestamp.MAX_VALUE : timestamp);

        return resolveReadResult(readResult, txId, timestamp, () -> {
            if (readResult.newestCommitTimestamp() == null) {
                return null;
            }

            ReadResult committedReadResult = mvDataStorage.read(rowId, readResult.newestCommitTimestamp());

            assert !committedReadResult.isWriteIntent() :
                    "The result is not committed [rowId=" + rowId + ", timestamp="
                            + readResult.newestCommitTimestamp() + ']';

            return new TimedBinaryRow(committedReadResult.binaryRow(), committedReadResult.commitTimestamp());
        });
    }

    /**
     * Resolves a result received from a direct storage read. Use it for RW.
     *
     * @param rowId Row id.
     * @param txId Transaction id.
     * @return Future finishes with the resolved binary row.
     */
    private CompletableFuture<@Nullable TimedBinaryRow> resolvePlainReadResult(RowId rowId, UUID txId) {
        return resolvePlainReadResult(rowId, txId, null).thenCompose(row -> {
            if (row == null || row.binaryRow() == null) {
                return nullCompletedFuture();
            }

            return validateBackwardCompatibility(row.binaryRow(), txId)
                    .thenApply(unused -> row);
        });
    }

    /**
     * Processes transaction cleanup request:
     * <ol>
     *     <li>Waits for finishing of local transactional operations;</li>
     *     <li>Runs asynchronously the specific raft {@code TxCleanupCommand} command, that will convert all pending entries(writeIntents)
     *     to either regular values({@link TxState#COMMITTED}) or removing them ({@link TxState#ABORTED});</li>
     *     <li>Releases all locks that were held on local Replica by given transaction.</li>
     * </ol>
     * This operation is idempotent, so it's safe to retry it.
     *
     * @param request Transaction cleanup request.
     * @return CompletableFuture of ReplicaResult.
     */
    private CompletableFuture<ReplicaResult> processWriteIntentSwitchAction(WriteIntentSwitchReplicaRequest request) {
        // When doing changes to this code, please take a look at WriteIntentSwitchRequestHandler#handle() as it might also need
        // to be touched.

        assert !nodeProperties.colocationEnabled() : request;

        replicaTxFinishMarker.markFinished(request.txId(), request.commit() ? COMMITTED : ABORTED, request.commitTimestamp());

        return awaitCleanupReadyFutures(request.txId(), request.commit())
                .thenApply(res -> {
                    if (res.shouldApplyWriteIntent()) {
                        CompletableFuture<WriteIntentSwitchReplicatedInfo> commandReplicatedFuture =
                                applyWriteIntentSwitchCommandLocallyAndToGroup(request);

                        return new ReplicaResult(null, new CommandApplicationResult(null, commandReplicatedFuture));
                    } else {
                        return new ReplicaResult(writeIntentSwitchReplicatedInfoFor(request), null);
                    }
                });
    }

    private CompletableFuture<ReplicaResult> processTableWriteIntentSwitchAction(TableWriteIntentSwitchReplicaRequest request) {
        assert nodeProperties.colocationEnabled() : request;

        return awaitCleanupReadyFutures(request.txId(), request.commit())
                .thenApply(res -> {
                    if (res.shouldApplyWriteIntent()) {
                        applyWriteIntentSwitchCommandLocally(request);
                    }

                    return new ReplicaResult(res, null);
                });
    }

    private WriteIntentSwitchReplicatedInfo writeIntentSwitchReplicatedInfoFor(WriteIntentSwitchReplicaRequest request) {
        return new WriteIntentSwitchReplicatedInfo(request.txId(), replicationGroupId);
    }

    private CompletableFuture<FuturesCleanupResult> awaitCleanupReadyFutures(UUID txId, boolean commit) {
        List<CompletableFuture<?>> txUpdateFutures = new ArrayList<>();
        List<CompletableFuture<?>> txReadFutures = new ArrayList<>();

        AtomicBoolean forceCleanup = new AtomicBoolean(true);

        txCleanupReadyFutures.compute(txId, (id, txOps) -> {
            if (txOps == null) {
                return null;
            }

            // Cleanup futures (both read and update) are empty in two cases:
            // - there were no actions in the transaction
            // - write intent switch is being executed on the new primary (the primary has changed after write intent appeared)
            // Both cases are expected to happen extremely rarely so we are fine to force the write intent switch.

            // The reason for the forced switch is that otherwise write intents would not be switched (if there is no volatile state and
            // FuturesCleanupResult.hadUpdateFutures() returns false).
            forceCleanup.set(txOps.futures.isEmpty());

            txOps.futures.forEach((opType, futures) -> {
                if (opType.isRwRead()) {
                    txReadFutures.addAll(futures.values());
                } else {
                    txUpdateFutures.addAll(futures.values());
                }
            });

            txOps.futures.clear();

            return null;
        });

        return allOfFuturesExceptionIgnored(txUpdateFutures, commit, txId)
                .thenCompose(v -> allOfFuturesExceptionIgnored(txReadFutures, commit, txId))
                .thenApply(v -> new FuturesCleanupResult(!txReadFutures.isEmpty(), !txUpdateFutures.isEmpty(), forceCleanup.get()));
    }

    private CompletableFuture<WriteIntentSwitchReplicatedInfo> applyWriteIntentSwitchCommandLocallyAndToGroup(
            WriteIntentSwitchReplicaRequest request
    ) {
        applyWriteIntentSwitchCommandLocally(request);

        WriteIntentSwitchReplicatedInfo result = writeIntentSwitchReplicatedInfoFor(request);

        assert !nodeProperties.colocationEnabled() : request;

        @Nullable HybridTimestamp commitTimestamp = request.commitTimestamp();
        HybridTimestamp commandTimestamp = commitTimestamp != null ? commitTimestamp : beginTimestamp(request.txId());

        return reliableCatalogVersions.safeReliableCatalogVersionFor(commandTimestamp)
                .thenCompose(catalogVersion -> applyWriteIntentSwitchCommandToGroup(request, catalogVersion))
                .thenApply(res -> result);
    }

    private void applyWriteIntentSwitchCommandLocally(WriteIntentSwitchReplicaRequestBase request) {
        storageUpdateHandler.switchWriteIntents(
                request.txId(),
                request.commit(),
                request.commitTimestamp(),
                indexIdsAtRwTxBeginTsOrNull(request.txId())
        );
    }

    private CompletableFuture<?> applyWriteIntentSwitchCommandToGroup(WriteIntentSwitchReplicaRequest request, int catalogVersion) {
        WriteIntentSwitchCommand wiSwitchCmd = PARTITION_REPLICATION_MESSAGES_FACTORY.writeIntentSwitchCommandV2()
                .txId(request.txId())
                .commit(request.commit())
                .commitTimestamp(request.commitTimestamp())
                .initiatorTime(clockService.current())
                .tableIds(request.tableIds())
                .requiredCatalogVersion(catalogVersion)
                .build();

        return applyCmdWithExceptionHandling(wiSwitchCmd)
                .exceptionally(e -> {
                    if (!ReplicatorRecoverableExceptions.isRecoverable(e)) {
                        LOG.warn("Failed to complete transaction cleanup command [txId=" + request.txId() + ']', e);
                    }

                    ExceptionUtils.sneakyThrow(e);

                    return null;
                });
    }

    /**
     * Creates a future that waits all transaction operations are completed.
     *
     * @param txFutures Transaction operation futures.
     * @param commit If {@code true} this is a commit otherwise a rollback.
     * @param txId Transaction id.
     * @return The future completes when all futures in passed list are completed.
     */
    private static CompletableFuture<Void> allOfFuturesExceptionIgnored(List<CompletableFuture<?>> txFutures, boolean commit, UUID txId) {
        return allOf(txFutures.toArray(new CompletableFuture<?>[0]))
                .exceptionally(e -> {
                    assert !commit :
                            "Transaction is committing, but an operation has completed with exception [txId=" + txId
                                    + ", err=" + e.getMessage() + ']';

                    return null;
                });
    }

    private void releaseTxLocks(UUID txId) {
        lockManager.releaseAll(txId);
    }

    /**
     * Finds the row and its identifier by given pk search row.
     *
     * @param pk Binary Tuple representing a primary key.
     * @param txId An identifier of the transaction regarding which we need to resolve the given row.
     * @param action An action to perform on a resolved row.
     * @param <T> A type of the value returned by action.
     * @return A future object representing the result of the given action.
     */
    private <T> CompletableFuture<T> resolveRowByPk(
            BinaryTuple pk,
            UUID txId,
            IgniteTriFunction<@Nullable RowId, @Nullable BinaryRow, @Nullable HybridTimestamp, CompletableFuture<T>> action
    ) {
        IndexLocker pkLocker = indexesLockers.get().get(pkIndexStorage.get().id());

        assert pkLocker != null;

        CompletableFuture<Void> lockFut = pkLocker.locksForLookupByKey(txId, pk);

        Supplier<CompletableFuture<T>> sup = () -> {
            boolean cursorClosureSetUp = false;
            Cursor<RowId> cursor = null;

            try {
                cursor = getFromPkIndex(pk);

                Cursor<RowId> finalCursor = cursor;
                CompletableFuture<T> resolvingFuture = continueResolvingByPk(cursor, txId, action)
                        .whenComplete((res, ex) -> finalCursor.close());

                cursorClosureSetUp = true;

                return resolvingFuture;
            } finally {
                if (!cursorClosureSetUp && cursor != null) {
                    cursor.close();
                }
            }
        };

        if (isCompletedSuccessfully(lockFut)) {
            return sup.get();
        } else {
            return lockFut.thenCompose(ignored -> sup.get());
        }
    }

    private <T> CompletableFuture<T> continueResolvingByPk(
            Cursor<RowId> cursor,
            UUID txId,
            IgniteTriFunction<@Nullable RowId, @Nullable BinaryRow, @Nullable HybridTimestamp, CompletableFuture<T>> action
    ) {
        if (!cursor.hasNext()) {
            return action.apply(null, null, null);
        }

        RowId rowId = cursor.next();

        return resolvePlainReadResult(rowId, txId).thenCompose(row -> {
            if (row != null && row.binaryRow() != null) {
                return action.apply(rowId, row.binaryRow(), row.commitTimestamp());
            } else {
                return continueResolvingByPk(cursor, txId, action);
            }
        });
    }

    /**
     * Appends an operation to prevent the race between commit/rollback and the operation execution.
     *
     * @param txId Transaction id.
     * @param opId Operation id.
     * @param cmdType Command type.
     * @param full {@code True} if a full transaction and can be immediately committed.
     * @param op Operation closure.
     * @return A future object representing the result of the given operation.
     */
    private <T> CompletableFuture<T> appendTxCommand(
            UUID txId,
            OperationId opId,
            RequestType cmdType,
            boolean full,
            Supplier<CompletableFuture<T>> op
    ) {
        if (full) {
            return op.get().whenComplete((v, th) -> {
                // Fast unlock.
                releaseTxLocks(txId);
            });
        }

        var cleanupReadyFut = new CompletableFuture<Void>();

        txCleanupReadyFutures.compute(txId, (id, txOps) -> {
            // First check whether the transaction has already been finished.
            // And complete cleanupReadyFut with exception if it is the case.
            TxStateMeta txStateMeta = txManager.stateMeta(txId);

            if (txStateMeta == null || isFinalState(txStateMeta.txState()) || txStateMeta.txState() == FINISHING) {
                cleanupReadyFut.completeExceptionally(new Exception());

                return txOps;
            }

            // Otherwise collect cleanupReadyFut in the transaction's futures.
            if (txOps == null) {
                txOps = new TxCleanupReadyFutureList();
            }

            txOps.futures.computeIfAbsent(cmdType, type -> new HashMap<>()).put(opId, cleanupReadyFut);

            return txOps;
        });

        if (cleanupReadyFut.isCompletedExceptionally()) {
            TxStateMeta txStateMeta = txManager.stateMeta(txId);

            TxState txState = txStateMeta == null ? null : txStateMeta.txState();
            boolean isFinishedDueToTimeout = txStateMeta != null
                    && txStateMeta.isFinishedDueToTimeout() != null
                    && txStateMeta.isFinishedDueToTimeout();


            return failedFuture(new TransactionException(
                            isFinishedDueToTimeout ? TX_ALREADY_FINISHED_WITH_TIMEOUT_ERR : TX_ALREADY_FINISHED_ERR,
                    "Transaction is already finished txId=[" + txId + ", txState=" + txState + "]."
            ));
        }

        CompletableFuture<T> fut = op.get();

        fut.whenComplete((v, th) -> {
            if (th != null) {
                cleanupReadyFut.completeExceptionally(th);
            } else {
                if (v instanceof ReplicaResult) {
                    ReplicaResult res = (ReplicaResult) v;

                    if (res.applyResult().replicationFuture() != null) {
                        res.applyResult().replicationFuture().whenComplete((v0, th0) -> {
                            if (th0 != null) {
                                cleanupReadyFut.completeExceptionally(th0);
                            } else {
                                cleanupReadyFut.complete(null);
                            }
                        });
                    } else {
                        cleanupReadyFut.complete(null);
                    }
                } else {
                    cleanupReadyFut.complete(null);
                }
            }
        });

        return fut;
    }

    /**
     * Finds the row and its identifier by given pk search row.
     *
     * @param pk Binary Tuple bytes representing a primary key.
     * @param ts A timestamp regarding which we need to resolve the given row.
     * @return Result of the given action.
     */
    private CompletableFuture<@Nullable BinaryRow> resolveRowByPkForReadOnly(BinaryTuple pk, HybridTimestamp ts) {
        // Indexes store values associated with different versions of one entry.
        // It's possible to have multiple entries for a particular search key
        // only if we insert, delete and again insert an entry with the same indexed fields.
        // It means that there exists one and only one non-empty readResult for any read timestamp for the given key.
        // Which in turn means that if we have found non empty readResult during PK index iteration
        // we can proceed with readResult resolution and stop the iteration.
        try (Cursor<RowId> cursor = getFromPkIndex(pk)) {
            // TODO https://issues.apache.org/jira/browse/IGNITE-18767 scan of multiple write intents should not be needed
            List<ReadResult> writeIntents = new ArrayList<>();
            List<ReadResult> regularEntries = new ArrayList<>();

            for (RowId rowId : cursor) {
                ReadResult readResult = mvDataStorage.read(rowId, ts);

                if (readResult.isWriteIntent()) {
                    writeIntents.add(readResult);
                } else if (!readResult.isEmpty()) {
                    regularEntries.add(readResult);
                }
            }

            // Nothing found in the storage, return null.
            if (writeIntents.isEmpty() && regularEntries.isEmpty()) {
                return nullCompletedFuture();
            }

            if (writeIntents.isEmpty()) {
                // No write intents, then return the committed value. We already know that regularEntries is not empty.
                return completedFuture(regularEntries.get(0).binaryRow());
            } else {
                ReadResult writeIntent = writeIntents.get(0);

                // Assume that all write intents for the same key belong to the same transaction, as the key should be exclusively locked.
                // This means that we can just resolve the state of this transaction.
                checkWriteIntentsBelongSameTx(writeIntents);

                return inBusyLockAsync(busyLock, () ->
                        resolveWriteIntentReadability(writeIntent, ts)
                                .thenApply(writeIntentReadable ->
                                        inBusyLock(busyLock, () -> {
                                            if (writeIntentReadable) {
                                                return findAny(writeIntents, wi -> !wi.isEmpty()).map(ReadResult::binaryRow).orElse(null);
                                            } else {
                                                for (ReadResult wi : writeIntents) {
                                                    HybridTimestamp newestCommitTimestamp = wi.newestCommitTimestamp();

                                                    if (newestCommitTimestamp == null) {
                                                        continue;
                                                    }

                                                    ReadResult committedReadResult = mvDataStorage.read(wi.rowId(), newestCommitTimestamp);

                                                    assert !committedReadResult.isWriteIntent() :
                                                            "The result is not committed [rowId=" + wi.rowId() + ", timestamp="
                                                                    + newestCommitTimestamp + ']';

                                                    return committedReadResult.binaryRow();
                                                }

                                                // No suitable value found in write intents, read the committed value (if exists)
                                                return findFirst(regularEntries).map(ReadResult::binaryRow).orElse(null);
                                            }
                                        }))
                );
            }
        }
    }

    /**
     * Check that all given write intents belong to the same transaction.
     *
     * @param writeIntents Write intents.
     */
    private static void checkWriteIntentsBelongSameTx(Collection<ReadResult> writeIntents) {
        ReadResult writeIntent = findAny(writeIntents).orElseThrow();

        for (ReadResult wi : writeIntents) {
            assert Objects.equals(wi.transactionId(), writeIntent.transactionId())
                    : "Unexpected write intent, tx1=" + writeIntent.transactionId() + ", tx2=" + wi.transactionId();

            assert Objects.equals(wi.commitTableOrZoneId(), writeIntent.commitTableOrZoneId())
                    : "Unexpected write intent, commitTableOrZoneId1=" + writeIntent.commitTableOrZoneId()
                    + ", commitTableId2=" + wi.commitTableOrZoneId();

            assert wi.commitPartitionId() == writeIntent.commitPartitionId()
                    : "Unexpected write intent, commitPartitionId1=" + writeIntent.commitPartitionId()
                    + ", commitPartitionId2=" + wi.commitPartitionId();
        }
    }

    /**
     * Tests row values for equality.
     *
     * @param row Row.
     * @param row2 Row.
     * @return {@code true} if rows are equal.
     */
    private static boolean equalValues(BinaryRow row, BinaryRow row2) {
        return row.tupleSlice().compareTo(row2.tupleSlice()) == 0;
    }

    /**
     * Processes multiple entries direct request for read only transaction.
     *
     * @param request Read only multiple entries request.
     * @param opStartTimestamp Moment when the operation processing was started in this class.
     * @return Result future.
     */
    private CompletableFuture<List<BinaryRow>> processReadOnlyDirectMultiEntryAction(
            ReadOnlyDirectMultiRowReplicaRequest request,
            HybridTimestamp opStartTimestamp) {
        List<BinaryTuple> primaryKeys = resolvePks(request.primaryKeys());
        HybridTimestamp readTimestamp = opStartTimestamp;

        if (request.requestType() != RO_GET_ALL) {
            throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
                    format("Unknown single request [actionType={}]", request.requestType()));
        }

        CompletableFuture<BinaryRow>[] resolutionFuts = new CompletableFuture[primaryKeys.size()];

        for (int i = 0; i < primaryKeys.size(); i++) {
            resolutionFuts[i] = resolveRowByPkForReadOnly(primaryKeys.get(i), readTimestamp);
        }

        return allOfToList(resolutionFuts);
    }

    /**
     * Precesses multi request.
     *
     * @param request Multi request operation.
     * @param leaseStartTime Lease start time.
     * @return Listener response.
     */
    private CompletableFuture<ReplicaResult> processMultiEntryAction(ReadWriteMultiRowReplicaRequest request, long leaseStartTime) {
        UUID txId = request.transactionId();
        ReplicationGroupId commitPartitionId = request.commitPartitionId().asReplicationGroupId();
        List<BinaryRow> searchRows = request.binaryRows();

        assert commitPartitionId != null : "Commit partition is null [type=" + request.requestType() + ']';

        switch (request.requestType()) {
            case RW_DELETE_EXACT_ALL: {
                CompletableFuture<RowId>[] deleteExactLockFuts = new CompletableFuture[searchRows.size()];

                Map<UUID, HybridTimestamp> lastCommitTimes = new ConcurrentHashMap<>();

                for (int i = 0; i < searchRows.size(); i++) {
                    BinaryRow searchRow = searchRows.get(i);

                    deleteExactLockFuts[i] = resolveRowByPk(extractPk(searchRow), txId, (rowId, row, lastCommitTime) -> {
                        if (rowId == null) {
                            return nullCompletedFuture();
                        }

                        if (lastCommitTime != null) {
                            lastCommitTimes.put(rowId.uuid(), lastCommitTime);
                        }

                        return takeLocksForDeleteExact(searchRow, rowId, row, txId);
                    });
                }

                return allOf(deleteExactLockFuts).thenCompose(ignore -> {
                    Map<UUID, TimedBinaryRowMessage> rowIdsToDelete = new HashMap<>();
                    // TODO:IGNITE-20669 Replace the result to BitSet.
                    Collection<BinaryRow> result = new ArrayList<>();
                    List<RowId> rows = new ArrayList<>();

                    for (int i = 0; i < searchRows.size(); i++) {
                        RowId lockedRowId = deleteExactLockFuts[i].join();

                        if (lockedRowId != null) {
                            rowIdsToDelete.put(lockedRowId.uuid(), PARTITION_REPLICATION_MESSAGES_FACTORY.timedBinaryRowMessage()
                                    .timestamp(lastCommitTimes.get(lockedRowId.uuid()))
                                    .build());

                            result.add(new NullBinaryRow());

                            rows.add(lockedRowId);
                        } else {
                            result.add(null);
                        }
                    }

                    if (rowIdsToDelete.isEmpty()) {
                        return completedFuture(new ReplicaResult(result, null));
                    }

                    return validateWriteAgainstSchemaAfterTakingLocks(request.transactionId())
                            .thenCompose(catalogVersion -> awaitCleanup(rows, catalogVersion))
                            .thenCompose(
                                    catalogVersion -> applyUpdateAllCommand(
                                            request,
                                            rowIdsToDelete,
                                            catalogVersion,
                                            leaseStartTime
                                    )
                            )
                            .thenApply(res -> new ReplicaResult(result, res));
                });
            }
            case RW_INSERT_ALL: {
                List<BinaryTuple> pks = new ArrayList<>(searchRows.size());

                CompletableFuture<RowId>[] pkReadLockFuts = new CompletableFuture[searchRows.size()];

                for (int i = 0; i < searchRows.size(); i++) {
                    BinaryTuple pk = extractPk(searchRows.get(i));

                    pks.add(pk);

                    pkReadLockFuts[i] = resolveRowByPk(pk, txId, (rowId, row, lastCommitTime) -> completedFuture(rowId));
                }

                return allOf(pkReadLockFuts).thenCompose(ignore -> {
                    // TODO:IGNITE-20669 Replace the result to BitSet.
                    Collection<BinaryRow> result = new ArrayList<>();
                    Map<RowId, BinaryRow> rowsToInsert = new HashMap<>();
                    Set<ByteBuffer> uniqueKeys = new HashSet<>();

                    for (int i = 0; i < searchRows.size(); i++) {
                        BinaryRow row = searchRows.get(i);
                        RowId lockedRow = pkReadLockFuts[i].join();

                        if (lockedRow == null && uniqueKeys.add(pks.get(i).byteBuffer())) {
                            rowsToInsert.put(new RowId(partId(), RowIdGenerator.next()), row);

                            result.add(new NullBinaryRow());
                        } else {
                            result.add(null);
                        }
                    }

                    if (rowsToInsert.isEmpty()) {
                        return completedFuture(new ReplicaResult(result, null));
                    }

                    CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>>[] insertLockFuts = new CompletableFuture[rowsToInsert.size()];

                    int idx = 0;

                    for (Map.Entry<RowId, BinaryRow> entry : rowsToInsert.entrySet()) {
                        insertLockFuts[idx++] = takeLocksForInsert(entry.getValue(), entry.getKey(), txId);
                    }

                    Map<UUID, TimedBinaryRowMessage> convertedMap = rowsToInsert.entrySet().stream()
                            .collect(toMap(
                                    e -> e.getKey().uuid(),
                                    e -> PARTITION_REPLICATION_MESSAGES_FACTORY.timedBinaryRowMessage()
                                            .binaryRowMessage(binaryRowMessage(e.getValue()))
                                            .build()
                            ));

                    return allOf(insertLockFuts)
                            .thenCompose(ignored ->
                                    // We are inserting completely new rows - no need to cleanup anything in this case, hence empty times.
                                    validateWriteAgainstSchemaAfterTakingLocks(request.transactionId())
                            )
                            .thenCompose(catalogVersion -> applyUpdateAllCommand(
                                            request,
                                            convertedMap,
                                            catalogVersion,
                                            leaseStartTime
                                    )
                            )
                            .thenApply(res -> {
                                // Release short term locks.
                                for (CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>> insertLockFut : insertLockFuts) {
                                    insertLockFut.join().get2()
                                            .forEach(lock -> lockManager.release(lock.txId(), lock.lockKey(), lock.lockMode()));
                                }

                                return new ReplicaResult(result, res);
                            });
                });
            }
            case RW_UPSERT_ALL: {
                CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>>[] rowIdFuts = new CompletableFuture[searchRows.size()];
                BinaryTuple[] pks = new BinaryTuple[searchRows.size()];

                Map<UUID, HybridTimestamp> lastCommitTimes = new ConcurrentHashMap<>();
                BitSet deleted = request.deleted();

                // When the same key is updated multiple times within the same batch, we need to maintain operation order and apply
                // only the last update. This map stores the previous searchRows index for each key.
                Map<ByteBuffer, Integer> prevRowIdx = new HashMap<>();

                for (int i = 0; i < searchRows.size(); i++) {
                    BinaryRow searchRow = searchRows.get(i);
                    boolean isDelete = deleted != null && deleted.get(i);

                    BinaryTuple pk = isDelete
                            ? resolvePk(searchRow.tupleSlice())
                            : extractPk(searchRow);

                    pks[i] = pk;

                    Integer prevRowIdx0 = prevRowIdx.put(pk.byteBuffer(), i);
                    if (prevRowIdx0 != null) {
                        rowIdFuts[prevRowIdx0] = nullCompletedFuture(); // Skip previous row with the same key.
                    }
                }

                for (int i = 0; i < searchRows.size(); i++) {
                    if (rowIdFuts[i] != null) {
                        continue; // Skip previous row with the same key.
                    }

                    BinaryRow searchRow = searchRows.get(i);
                    boolean isDelete = deleted != null && deleted.get(i);

                    rowIdFuts[i] = resolveRowByPk(pks[i], txId, (rowId, row, lastCommitTime) -> {
                        if (isDelete && rowId == null) {
                            return nullCompletedFuture();
                        }

                        if (lastCommitTime != null) {
                            //noinspection DataFlowIssue (rowId is not null if lastCommitTime is not null)
                            lastCommitTimes.put(rowId.uuid(), lastCommitTime);
                        }

                        if (isDelete) {
                            assert row != null;

                            return takeLocksForDelete(row, rowId, txId)
                                    .thenApply(id -> new IgniteBiTuple<>(id, null));
                        }

                        boolean insert = rowId == null;
                        RowId rowId0 = insert ? new RowId(partId(), RowIdGenerator.next()) : rowId;

                        return insert
                                ? takeLocksForInsert(searchRow, rowId0, txId)
                                : takeLocksForUpdate(searchRow, rowId0, txId);
                    });
                }

                return allOf(rowIdFuts).thenCompose(ignore -> {
                    Map<UUID, TimedBinaryRowMessage> rowsToUpdate = IgniteUtils.newHashMap(searchRows.size());
                    List<RowId> rows = new ArrayList<>();

                    for (int i = 0; i < searchRows.size(); i++) {
                        IgniteBiTuple<RowId, Collection<Lock>> locks = rowIdFuts[i].join();
                        if (locks == null) {
                            continue;
                        }

                        RowId lockedRow = locks.get1();

                        TimedBinaryRowMessageBuilder timedBinaryRowMessageBuilder = PARTITION_REPLICATION_MESSAGES_FACTORY
                                .timedBinaryRowMessage()
                                .timestamp(lastCommitTimes.get(lockedRow.uuid()));

                        if (deleted == null || !deleted.get(i)) {
                            timedBinaryRowMessageBuilder.binaryRowMessage(binaryRowMessage(searchRows.get(i)));
                        }

                        rowsToUpdate.put(lockedRow.uuid(), timedBinaryRowMessageBuilder.build());

                        rows.add(lockedRow);
                    }

                    if (rowsToUpdate.isEmpty()) {
                        return completedFuture(new ReplicaResult(null, null));
                    }

                    return validateWriteAgainstSchemaAfterTakingLocks(request.transactionId())
                            .thenCompose(catalogVersion -> awaitCleanup(rows, catalogVersion))
                            .thenCompose(
                                    catalogVersion -> applyUpdateAllCommand(
                                            request,
                                            rowsToUpdate,
                                            catalogVersion,
                                            leaseStartTime
                                    )
                            )
                            .thenApply(res -> {
                                // Release short term locks.
                                for (CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>> rowIdFut : rowIdFuts) {
                                    IgniteBiTuple<RowId, Collection<Lock>> futRes = rowIdFut.join();
                                    Collection<Lock> locks = futRes == null ? null : futRes.get2();

                                    if (locks != null) {
                                        locks.forEach(lock -> lockManager.release(lock.txId(), lock.lockKey(), lock.lockMode()));
                                    }
                                }

                                return new ReplicaResult(null, res);
                            });
                });
            }
            default: {
                throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
                        format("Unknown multi request [actionType={}]", request.requestType()));
            }
        }
    }

    /**
     * Precesses multi request.
     *
     * @param request Multi request operation.
     * @param leaseStartTime Lease start time.
     * @return Listener response.
     */
    private CompletableFuture<ReplicaResult> processMultiEntryAction(ReadWriteMultiRowPkReplicaRequest request, long leaseStartTime) {
        UUID txId = request.transactionId();
        ReplicationGroupId commitPartitionId = request.commitPartitionId().asReplicationGroupId();
        List<BinaryTuple> primaryKeys = resolvePks(request.primaryKeys());

        assert commitPartitionId != null || request.requestType() == RW_GET_ALL
                : "Commit partition is null [type=" + request.requestType() + ']';

        switch (request.requestType()) {
            case RW_GET_ALL: {
                CompletableFuture<BinaryRow>[] rowFuts = new CompletableFuture[primaryKeys.size()];

                for (int i = 0; i < primaryKeys.size(); i++) {
                    rowFuts[i] = resolveRowByPk(primaryKeys.get(i), txId, (rowId, row, lastCommitTime) -> {
                        if (rowId == null) {
                            return nullCompletedFuture();
                        }

                        return takeLocksForGet(rowId, txId)
                                .thenApply(ignored -> row);
                    });
                }

                return allOf(rowFuts)
                        .thenCompose(ignored -> {
                            var result = new ArrayList<BinaryRow>(primaryKeys.size());

                            for (CompletableFuture<BinaryRow> rowFut : rowFuts) {
                                result.add(rowFut.join());
                            }

                            if (allElementsAreNull(result)) {
                                return completedFuture(new ReplicaResult(result, null));
                            }

                            return validateRwReadAgainstSchemaAfterTakingLocks(txId)
                                    .thenApply(unused -> new ReplicaResult(result, null));
                        });
            }
            case RW_DELETE_ALL: {
                CompletableFuture<RowId>[] rowIdLockFuts = new CompletableFuture[primaryKeys.size()];

                Map<UUID, HybridTimestamp> lastCommitTimes = new ConcurrentHashMap<>();

                for (int i = 0; i < primaryKeys.size(); i++) {
                    rowIdLockFuts[i] = resolveRowByPk(primaryKeys.get(i), txId, (rowId, row, lastCommitTime) -> {
                        if (rowId == null) {
                            return nullCompletedFuture();
                        }

                        if (lastCommitTime != null) {
                            lastCommitTimes.put(rowId.uuid(), lastCommitTime);
                        }

                        return takeLocksForDelete(row, rowId, txId);
                    });
                }

                return allOf(rowIdLockFuts).thenCompose(ignore -> {
                    Map<UUID, TimedBinaryRowMessage> rowIdsToDelete = new HashMap<>();
                    // TODO:IGNITE-20669 Replace the result to BitSet.
                    Collection<BinaryRow> result = new ArrayList<>();
                    List<RowId> rows = new ArrayList<>();

                    for (CompletableFuture<RowId> lockFut : rowIdLockFuts) {
                        RowId lockedRowId = lockFut.join();

                        if (lockedRowId != null) {
                            rowIdsToDelete.put(lockedRowId.uuid(), PARTITION_REPLICATION_MESSAGES_FACTORY.timedBinaryRowMessage()
                                    .timestamp(lastCommitTimes.get(lockedRowId.uuid()))
                                    .build());

                            rows.add(lockedRowId);

                            result.add(new NullBinaryRow());
                        } else {
                            result.add(null);
                        }
                    }

                    if (rowIdsToDelete.isEmpty()) {
                        return completedFuture(new ReplicaResult(result, null));
                    }

                    return validateWriteAgainstSchemaAfterTakingLocks(request.transactionId())
                            .thenCompose(catalogVersion -> awaitCleanup(rows, catalogVersion))
                            .thenCompose(
                                    catalogVersion -> applyUpdateAllCommand(
                                            rowIdsToDelete,
                                            request.commitPartitionId(),
                                            request.transactionId(),
                                            request.full(),
                                            request.coordinatorId(),
                                            catalogVersion,
                                            request.skipDelayedAck(),
                                            leaseStartTime
                                    )
                            )
                            .thenApply(res -> new ReplicaResult(result, res));
                });
            }
            default: {
                throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
                        format("Unknown multi request [actionType={}]", request.requestType()));
            }
        }
    }

    private static <T> boolean allElementsAreNull(List<T> list) {
        for (T element : list) {
            if (element != null) {
                return false;
            }
        }

        return true;
    }

    private CompletableFuture<Object> applyCmdWithExceptionHandling(Command cmd) {
        return raftCommandApplicator.applyCommandWithExceptionHandling(cmd);
    }

    /**
     * Executes an Update command.
     *
     * @param commitPartitionId Commit partition ID.
     * @param rowUuid Row UUID.
     * @param row Row.
     * @param lastCommitTimestamp The timestamp of the last committed entry for the row.
     * @param txId Transaction ID.
     * @param full {@code True} if this is a full transaction.
     * @param txCoordinatorId Transaction coordinator id.
     * @param catalogVersion Validated catalog version associated with given operation.
     * @param leaseStartTime Lease start time.
     * @param skipDelayedAck {@code True} to skip delayed ack optimization.
     *
     * @return A local update ready future, possibly having a nested replication future as a result for delayed ack purpose.
     */
    private CompletableFuture<CommandApplicationResult> applyUpdateCommand(
            ReplicationGroupId commitPartitionId,
            UUID rowUuid,
            @Nullable BinaryRow row,
            @Nullable HybridTimestamp lastCommitTimestamp,
            UUID txId,
            boolean full,
            UUID txCoordinatorId,
            int catalogVersion,
            boolean skipDelayedAck,
            long leaseStartTime
    ) {
        UpdateCommand cmd = updateCommand(
                commitPartitionId,
                rowUuid,
                row,
                lastCommitTimestamp,
                txId,
                full,
                txCoordinatorId,
                clockService.current(),
                catalogVersion,
                full ? leaseStartTime : null  // Lease start time check within the replication group is needed only for full txns.
        );

        if (!cmd.full()) {
            if (skipDelayedAck) {
                if (!SKIP_UPDATES) {
                    storageUpdateHandler.handleUpdate(
                            cmd.txId(),
                            cmd.rowUuid(),
                            cmd.commitPartitionId().asReplicationGroupId(),
                            cmd.rowToUpdate(),
                            true,
                            null,
                            null,
                            null,
                            indexIdsAtRwTxBeginTs(txId)
                    );
                }

                return applyCmdWithExceptionHandling(cmd).thenApply(res -> null);
            } else {
                if (!SKIP_UPDATES) {
                    // We don't need to take the partition snapshots read lock, see #INTERNAL_DOC_PLACEHOLDER why.
                    storageUpdateHandler.handleUpdate(
                            cmd.txId(),
                            cmd.rowUuid(),
                            cmd.commitPartitionId().asReplicationGroupId(),
                            cmd.rowToUpdate(),
                            true,
                            null,
                            null,
                            null,
                            indexIdsAtRwTxBeginTs(txId)
                    );
                }

                CompletableFuture<UUID> repFut = applyCmdWithExceptionHandling(cmd).thenApply(res -> cmd.txId());

                return completedFuture(new CommandApplicationResult(null, repFut));
            }
        } else {
            return applyCmdWithExceptionHandling(cmd).thenCompose(res -> {
                UpdateCommandResult updateCommandResult = (UpdateCommandResult) res;

                if (updateCommandResult != null && !updateCommandResult.isPrimaryReplicaMatch()) {
                    throw new PrimaryReplicaMissException(txId, cmd.leaseStartTime(), updateCommandResult.currentLeaseStartTime());
                }

                if (updateCommandResult != null && updateCommandResult.isPrimaryInPeersAndLearners()) {
                    HybridTimestamp safeTs = hybridTimestamp(updateCommandResult.safeTimestamp());
                    return safeTime.waitFor(safeTs)
                            .thenApply(ignored -> new CommandApplicationResult(safeTs, null));
                } else {
                    HybridTimestamp safeTs = updateCommandResult == null ? null : hybridTimestamp(updateCommandResult.safeTimestamp());

                    if (!SKIP_UPDATES) {
                        // We don't need to take the partition snapshots read lock, see #INTERNAL_DOC_PLACEHOLDER why.
                        storageUpdateHandler.handleUpdate(
                                cmd.txId(),
                                cmd.rowUuid(),
                                cmd.commitPartitionId().asReplicationGroupId(),
                                cmd.rowToUpdate(),
                                false,
                                null,
                                safeTs,
                                null,
                                indexIdsAtRwTxBeginTs(txId)
                        );
                    }

                    return completedFuture(new CommandApplicationResult(safeTs, null));
                }
            });
        }
    }

    /**
     * Executes an Update command.
     *
     * @param request Read write single row replica request.
     * @param rowUuid Row UUID.
     * @param row Row.
     * @param lastCommitTimestamp The timestamp of the last committed entry for the row.
     * @param catalogVersion Validated catalog version associated with given operation.
     * @param leaseStartTime Lease start time.
     * @return A local update ready future, possibly having a nested replication future as a result for delayed ack purpose.
     */
    private CompletableFuture<CommandApplicationResult> applyUpdateCommand(
            ReadWriteSingleRowReplicaRequest request,
            UUID rowUuid,
            @Nullable BinaryRow row,
            @Nullable HybridTimestamp lastCommitTimestamp,
            int catalogVersion,
            long leaseStartTime
    ) {
        return applyUpdateCommand(
                request.commitPartitionId().asReplicationGroupId(),
                rowUuid,
                row,
                lastCommitTimestamp,
                request.transactionId(),
                request.full(),
                request.coordinatorId(),
                catalogVersion,
                request.skipDelayedAck(),
                leaseStartTime
        );
    }

    /**
     * Executes an UpdateAll command.
     *
     * @param rowsToUpdate All {@link BinaryRow}s represented as {@link TimedBinaryRowMessage}s to be updated.
     * @param commitPartitionId Partition ID that these rows belong to.
     * @param txId Transaction ID.
     * @param full {@code true} if this is a single-command transaction.
     * @param txCoordinatorId Transaction coordinator id.
     * @param catalogVersion Validated catalog version associated with given operation.
     * @param skipDelayedAck {@code true} to disable the delayed ack optimization.
     * @return Raft future, see {@link #applyCmdWithExceptionHandling(Command)}.
     */
    private CompletableFuture<CommandApplicationResult> applyUpdateAllCommand(
            Map<UUID, TimedBinaryRowMessage> rowsToUpdate,
            ReplicationGroupIdMessage commitPartitionId,
            UUID txId,
            boolean full,
            UUID txCoordinatorId,
            int catalogVersion,
            boolean skipDelayedAck,
            long leaseStartTime
    ) {
        UpdateAllCommand cmd = updateAllCommand(
                rowsToUpdate,
                commitPartitionId,
                txId,
                clockService.current(),
                full,
                txCoordinatorId,
                catalogVersion,
                full ? leaseStartTime : null  // Lease start time check within the replication group is needed only for full txns.
        );

        if (!cmd.full()) {
            if (skipDelayedAck) {
                // We don't need to take the partition snapshots read lock, see #INTERNAL_DOC_PLACEHOLDER why.
                storageUpdateHandler.handleUpdateAll(
                        cmd.txId(),
                        cmd.rowsToUpdate(),
                        cmd.commitPartitionId().asReplicationGroupId(),
                        true,
                        null,
                        null,
                        indexIdsAtRwTxBeginTs(txId)
                );

                return applyCmdWithExceptionHandling(cmd).thenApply(res -> null);
            } else {
                // We don't need to take the partition snapshots read lock, see #INTERNAL_DOC_PLACEHOLDER why.
                storageUpdateHandler.handleUpdateAll(
                        cmd.txId(),
                        cmd.rowsToUpdate(),
                        cmd.commitPartitionId().asReplicationGroupId(),
                        true,
                        null,
                        null,
                        indexIdsAtRwTxBeginTs(txId)
                );
            }

            CompletableFuture<UUID> repFut = applyCmdWithExceptionHandling(cmd).thenApply(res -> cmd.txId());

            return completedFuture(new CommandApplicationResult(null, repFut));
        } else {
            return applyCmdWithExceptionHandling(cmd).thenCompose(res -> {
                UpdateCommandResult updateCommandResult = (UpdateCommandResult) res;

                if (!updateCommandResult.isPrimaryReplicaMatch()) {
                    throw new PrimaryReplicaMissException(
                            cmd.txId(),
                            cmd.leaseStartTime(),
                            updateCommandResult.currentLeaseStartTime()
                    );
                }
                if (updateCommandResult.isPrimaryInPeersAndLearners()) {
                    HybridTimestamp safeTs = hybridTimestamp(updateCommandResult.safeTimestamp());
                    return safeTime.waitFor(safeTs)
                            .thenApply(ignored -> new CommandApplicationResult(safeTs, null));
                } else {
                    HybridTimestamp safeTs = hybridTimestamp(updateCommandResult.safeTimestamp());

                    // We don't need to take the partition snapshots read lock, see #INTERNAL_DOC_PLACEHOLDER why.
                    storageUpdateHandler.handleUpdateAll(
                            cmd.txId(),
                            cmd.rowsToUpdate(),
                            cmd.commitPartitionId().asReplicationGroupId(),
                            false,
                            null,
                            safeTs,
                            indexIdsAtRwTxBeginTs(txId)
                    );

                    return completedFuture(new CommandApplicationResult(safeTs, null));
                }
            });
        }
    }

    /**
     * Executes an UpdateAll command.
     *
     * @param request Read write multi rows replica request.
     * @param rowsToUpdate All {@link BinaryRow}s represented as {@link TimedBinaryRowMessage}s to be updated.
     * @param catalogVersion Validated catalog version associated with given operation.
     * @param leaseStartTime Lease start time.
     * @return Raft future.
     */
    private CompletableFuture<CommandApplicationResult> applyUpdateAllCommand(
            ReadWriteMultiRowReplicaRequest request,
            Map<UUID, TimedBinaryRowMessage> rowsToUpdate,
            int catalogVersion,
            long leaseStartTime
    ) {
        return applyUpdateAllCommand(
                rowsToUpdate,
                request.commitPartitionId(),
                request.transactionId(),
                request.full(),
                request.coordinatorId(),
                catalogVersion,
                request.skipDelayedAck(),
                leaseStartTime
        );
    }

    /**
     * Processes single entry direct request for read only transaction.
     *
     * @param request Read only single entry request.
     * @param opStartTimestamp Moment when the operation processing was started in this class.
     * @return Result future.
     */
    private CompletableFuture<BinaryRow> processReadOnlyDirectSingleEntryAction(
            ReadOnlyDirectSingleRowReplicaRequest request,
            HybridTimestamp opStartTimestamp
    ) {
        BinaryTuple primaryKey = resolvePk(request.primaryKey());
        HybridTimestamp readTimestamp = opStartTimestamp;

        if (request.requestType() != RO_GET) {
            throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
                    format("Unknown single request [actionType={}]", request.requestType()));
        }

        return resolveRowByPkForReadOnly(primaryKey, readTimestamp);
    }

    /**
     * Precesses single request.
     *
     * @param request Single request operation.
     * @param leaseStartTime Lease start time.
     * @return Listener response.
     */
    private CompletableFuture<ReplicaResult> processSingleEntryAction(ReadWriteSingleRowReplicaRequest request, long leaseStartTime) {
        UUID txId = request.transactionId();
        BinaryRow searchRow = request.binaryRow();
        ReplicationGroupId commitPartitionId = request.commitPartitionId().asReplicationGroupId();

        assert commitPartitionId != null : "Commit partition is null [type=" + request.requestType() + ']';

        switch (request.requestType()) {
            case RW_DELETE_EXACT: {
                return resolveRowByPk(extractPk(searchRow), txId, (rowId, row, lastCommitTime) -> {
                    if (rowId == null) {
                        return completedFuture(new ReplicaResult(false, null));
                    }

                    return takeLocksForDeleteExact(searchRow, rowId, row, txId)
                            .thenCompose(validatedRowId -> {
                                if (validatedRowId == null) {
                                    return completedFuture(new ReplicaResult(false, null));
                                }

                                return validateWriteAgainstSchemaAfterTakingLocks(request.transactionId())
                                        .thenCompose(catalogVersion -> awaitCleanup(validatedRowId, catalogVersion))
                                        .thenCompose(
                                                catalogVersion -> applyUpdateCommand(
                                                        request,
                                                        validatedRowId.uuid(),
                                                        null,
                                                        lastCommitTime,
                                                        catalogVersion,
                                                        leaseStartTime
                                                )
                                        )
                                        .thenApply(res -> new ReplicaResult(true, res));
                            });
                });
            }
            case RW_INSERT: {
                return resolveRowByPk(extractPk(searchRow), txId, (rowId, row, lastCommitTime) -> {
                    if (rowId != null) {
                        return completedFuture(new ReplicaResult(false, null));
                    }

                    RowId rowId0 = new RowId(partId(), RowIdGenerator.next());

                    return takeLocksForInsert(searchRow, rowId0, txId)
                            .thenCompose(rowIdLock -> validateWriteAgainstSchemaAfterTakingLocks(request.transactionId())
                                    .thenCompose(
                                            catalogVersion -> applyUpdateCommand(
                                                    request,
                                                    rowId0.uuid(),
                                                    searchRow,
                                                    lastCommitTime,
                                                    catalogVersion,
                                                    leaseStartTime
                                            )
                                    )
                                    .thenApply(res -> new IgniteBiTuple<>(res, rowIdLock)))
                            .thenApply(tuple -> {
                                // Release short term locks.
                                tuple.get2().get2().forEach(lock -> lockManager.release(lock.txId(), lock.lockKey(), lock.lockMode()));

                                return new ReplicaResult(true, tuple.get1());
                            });
                });
            }
            case RW_UPSERT: {
                return resolveRowByPk(extractPk(searchRow), txId, (rowId, row, lastCommitTime) -> {
                    boolean insert = rowId == null;

                    RowId rowId0 = insert ? new RowId(partId(), RowIdGenerator.next()) : rowId;

                    CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>> lockFut = insert
                            ? takeLocksForInsert(searchRow, rowId0, txId)
                            : takeLocksForUpdate(searchRow, rowId0, txId);

                    return lockFut
                            .thenCompose(rowIdLock -> validateWriteAgainstSchemaAfterTakingLocks(request.transactionId())
                                    .thenCompose(catalogVersion -> awaitCleanup(rowId, catalogVersion))
                                    .thenCompose(
                                            catalogVersion -> applyUpdateCommand(
                                                    request,
                                                    rowId0.uuid(),
                                                    searchRow,
                                                    lastCommitTime,
                                                    catalogVersion,
                                                    leaseStartTime
                                            )
                                    )
                                    .thenApply(res -> new IgniteBiTuple<>(res, rowIdLock)))
                            .thenApply(tuple -> {
                                // Release short term locks.
                                tuple.get2().get2().forEach(lock -> lockManager.release(lock.txId(), lock.lockKey(), lock.lockMode()));

                                return new ReplicaResult(null, tuple.get1());
                            });
                });
            }
            case RW_GET_AND_UPSERT: {
                return resolveRowByPk(extractPk(searchRow), txId, (rowId, row, lastCommitTime) -> {
                    boolean insert = rowId == null;

                    RowId rowId0 = insert ? new RowId(partId(), RowIdGenerator.next()) : rowId;

                    CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>> lockFut = insert
                            ? takeLocksForInsert(searchRow, rowId0, txId)
                            : takeLocksForUpdate(searchRow, rowId0, txId);

                    return lockFut
                            .thenCompose(rowIdLock -> validateWriteAgainstSchemaAfterTakingLocks(request.transactionId())
                                    .thenCompose(catalogVersion -> awaitCleanup(rowId, catalogVersion))
                                    .thenCompose(
                                            catalogVersion -> applyUpdateCommand(
                                                    request,
                                                    rowId0.uuid(),
                                                    searchRow,
                                                    lastCommitTime,
                                                    catalogVersion,
                                                    leaseStartTime
                                            )
                                    )
                                    .thenApply(res -> new IgniteBiTuple<>(res, rowIdLock)))
                            .thenApply(tuple -> {
                                // Release short term locks.
                                tuple.get2().get2().forEach(lock -> lockManager.release(lock.txId(), lock.lockKey(), lock.lockMode()));

                                return new ReplicaResult(row, tuple.get1());
                            });
                });
            }
            case RW_GET_AND_REPLACE: {
                return resolveRowByPk(extractPk(searchRow), txId, (rowId, row, lastCommitTime) -> {
                    if (rowId == null) {
                        return completedFuture(new ReplicaResult(null, null));
                    }

                    return takeLocksForUpdate(searchRow, rowId, txId)
                            .thenCompose(rowIdLock -> validateWriteAgainstSchemaAfterTakingLocks(request.transactionId())
                                    .thenCompose(catalogVersion -> awaitCleanup(rowId, catalogVersion))
                                    .thenCompose(
                                            catalogVersion -> applyUpdateCommand(
                                                    request,
                                                    rowId.uuid(),
                                                    searchRow,
                                                    lastCommitTime,
                                                    catalogVersion,
                                                    leaseStartTime
                                            )
                                    )
                                    .thenApply(res -> new IgniteBiTuple<>(res, rowIdLock)))
                            .thenApply(tuple -> {
                                // Release short term locks.
                                tuple.get2().get2().forEach(lock -> lockManager.release(lock.txId(), lock.lockKey(), lock.lockMode()));

                                return new ReplicaResult(row, tuple.get1());
                            });
                });
            }
            case RW_REPLACE_IF_EXIST: {
                return resolveRowByPk(extractPk(searchRow), txId, (rowId, row, lastCommitTime) -> {
                    if (rowId == null) {
                        return completedFuture(new ReplicaResult(false, null));
                    }

                    return takeLocksForUpdate(searchRow, rowId, txId)
                            .thenCompose(rowIdLock -> validateWriteAgainstSchemaAfterTakingLocks(request.transactionId())
                                    .thenCompose(catalogVersion -> awaitCleanup(rowId, catalogVersion))
                                    .thenCompose(
                                            catalogVersion -> applyUpdateCommand(
                                                    request,
                                                    rowId.uuid(),
                                                    searchRow,
                                                    lastCommitTime,
                                                    catalogVersion,
                                                    leaseStartTime
                                            )
                                    )
                                    .thenApply(res -> new IgniteBiTuple<>(res, rowIdLock)))
                            .thenApply(tuple -> {
                                // Release short term locks.
                                tuple.get2().get2().forEach(lock -> lockManager.release(lock.txId(), lock.lockKey(), lock.lockMode()));

                                return new ReplicaResult(true, tuple.get1());
                            });
                });
            }
            default: {
                throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
                        format("Unknown single request [actionType={}]", request.requestType()));
            }
        }
    }

    /**
     * Precesses single request.
     *
     * @param request Single request operation.
     * @param leaseStartTime Lease start time.
     * @return Listener response.
     */
    private CompletableFuture<ReplicaResult> processSingleEntryAction(ReadWriteSingleRowPkReplicaRequest request, long leaseStartTime) {
        UUID txId = request.transactionId();
        BinaryTuple primaryKey = resolvePk(request.primaryKey());
        ReplicationGroupId commitPartitionId = request.commitPartitionId().asReplicationGroupId();

        assert commitPartitionId != null || request.requestType() == RW_GET :
                "Commit partition is null [type=" + request.requestType() + ']';

        switch (request.requestType()) {
            case RW_GET: {
                return resolveRowByPk(primaryKey, txId, (rowId, row, lastCommitTime) -> {
                    if (rowId == null) {
                        return nullCompletedFuture();
                    }

                    return takeLocksForGet(rowId, txId)
                            .thenCompose(ignored -> validateRwReadAgainstSchemaAfterTakingLocks(txId))
                            .thenApply(ignored -> new ReplicaResult(row, null));
                });
            }
            case RW_DELETE: {
                return resolveRowByPk(primaryKey, txId, (rowId, row, lastCommitTime) -> {
                    if (rowId == null) {
                        return completedFuture(new ReplicaResult(false, null));
                    }

                    return takeLocksForDelete(row, rowId, txId)
                            .thenCompose(rowLock -> validateWriteAgainstSchemaAfterTakingLocks(request.transactionId()))
                            .thenCompose(catalogVersion -> awaitCleanup(rowId, catalogVersion))
                            .thenCompose(
                                    catalogVersion -> applyUpdateCommand(
                                            request.commitPartitionId().asReplicationGroupId(),
                                            rowId.uuid(),
                                            null,
                                            lastCommitTime,
                                            request.transactionId(),
                                            request.full(),
                                            request.coordinatorId(),
                                            catalogVersion,
                                            request.skipDelayedAck(),
                                            leaseStartTime
                                    )
                            )
                            .thenApply(res -> new ReplicaResult(true, res));
                });
            }
            case RW_GET_AND_DELETE: {
                return resolveRowByPk(primaryKey, txId, (rowId, row, lastCommitTime) -> {
                    if (rowId == null) {
                        return nullCompletedFuture();
                    }

                    return takeLocksForDelete(row, rowId, txId)
                            .thenCompose(ignored -> validateWriteAgainstSchemaAfterTakingLocks(request.transactionId()))
                            .thenCompose(catalogVersion -> awaitCleanup(rowId, catalogVersion))
                            .thenCompose(
                                    catalogVersion -> applyUpdateCommand(
                                            request.commitPartitionId().asReplicationGroupId(),
                                            rowId.uuid(),
                                            null,
                                            lastCommitTime,
                                            request.transactionId(),
                                            request.full(),
                                            request.coordinatorId(),
                                            catalogVersion,
                                            request.skipDelayedAck(),
                                            leaseStartTime
                                    )
                            )
                            .thenApply(res -> new ReplicaResult(row, res));
                });
            }
            default: {
                throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
                        format("Unknown single request [actionType={}]", request.requestType()));
            }
        }
    }

    /**
     * Wait for the async cleanup of the provided row to finish.
     *
     * @param rowId Row Ids of existing row that the transaction affects.
     * @param result The value that the returned future will wrap.
     * @param <T> Type of the {@code result}.
     */
    private <T> CompletableFuture<T> awaitCleanup(@Nullable RowId rowId, T result) {
        return (rowId == null ? nullCompletedFuture() : rowCleanupMap.getOrDefault(rowId, nullCompletedFuture()))
                .thenApply(ignored -> result);
    }

    /**
     * Wait for the async cleanup of the provided rows to finish.
     *
     * @param rowIds Row Ids of existing rows that the transaction affects.
     * @param result The value that the returned future will wrap.
     * @param <T> Type of the {@code result}.
     */
    private <T> CompletableFuture<T> awaitCleanup(Collection<RowId> rowIds, T result) {
        if (rowCleanupMap.isEmpty()) {
            return completedFuture(result);
        }

        List<CompletableFuture<?>> list = new ArrayList<>(rowIds.size());

        for (RowId rowId : rowIds) {
            CompletableFuture<?> completableFuture = rowCleanupMap.get(rowId);
            if (completableFuture != null) {
                list.add(completableFuture);
            }
        }
        if (list.isEmpty()) {
            return completedFuture(result);
        }

        return allOf(list.toArray(new CompletableFuture[0]))
                .thenApply(unused -> result);
    }

    /**
     * Extracts a binary tuple corresponding to a part of the row comprised of PK columns.
     *
     * <p>This method must ONLY be invoked when we're sure that a schema version equal to {@code row.schemaVersion()}
     * is already available on this node (see {@link SchemaSyncService}).
     *
     * @param row Row for which to do extraction.
     */
    private BinaryTuple extractPk(BinaryRow row) {
        return pkIndexStorage.get().indexRowResolver().extractColumns(row);
    }

    private BinaryTuple resolvePk(ByteBuffer bytes) {
        return pkIndexStorage.get().resolve(bytes);
    }

    private List<BinaryTuple> resolvePks(List<ByteBuffer> bytesList) {
        var pks = new ArrayList<BinaryTuple>(bytesList.size());

        for (ByteBuffer bytes : bytesList) {
            pks.add(resolvePk(bytes));
        }

        return pks;
    }

    private Cursor<RowId> getFromPkIndex(BinaryTuple key) {
        return pkIndexStorage.get().storage().get(key);
    }

    /**
     * Takes all required locks on a key, before upserting.
     *
     * @param txId Transaction id.
     * @return Future completes with tuple {@link RowId} and collection of {@link Lock}.
     */
    private CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>> takeLocksForUpdate(BinaryRow binaryRow, RowId rowId, UUID txId) {
        return lockManager.acquire(txId, new LockKey(tableLockKey), LockMode.IX)
                .thenCompose(ignored -> lockManager.acquire(txId, new LockKey(tableLockKey, rowId), LockMode.X))
                .thenCompose(ignored -> takePutLockOnIndexes(binaryRow, rowId, txId))
                .thenApply(shortTermLocks -> new IgniteBiTuple<>(rowId, shortTermLocks));
    }

    /**
     * Takes all required locks on a key, before inserting the value.
     *
     * @param binaryRow Table row.
     * @param txId Transaction id.
     * @return Future completes with tuple {@link RowId} and collection of {@link Lock}.
     */
    private CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>> takeLocksForInsert(BinaryRow binaryRow, RowId rowId, UUID txId) {
        return lockManager.acquire(txId, new LockKey(tableLockKey), LockMode.IX)
                .thenCompose(ignored -> takePutLockOnIndexes(binaryRow, rowId, txId))
                .thenApply(shortTermLocks -> new IgniteBiTuple<>(rowId, shortTermLocks));
    }

    private CompletableFuture<Collection<Lock>> takePutLockOnIndexes(BinaryRow binaryRow, RowId rowId, UUID txId) {
        Collection<IndexLocker> indexes = indexesLockers.get().values();

        if (nullOrEmpty(indexes)) {
            return emptyCollectionCompletedFuture();
        }

        CompletableFuture<Lock>[] locks = new CompletableFuture[indexes.size()];
        int idx = 0;

        for (IndexLocker locker : indexes) {
            locks[idx++] = locker.locksForInsert(txId, binaryRow, rowId);
        }

        return allOf(locks).thenApply(unused -> {
            var shortTermLocks = new ArrayList<Lock>();

            for (CompletableFuture<Lock> lockFut : locks) {
                Lock shortTermLock = lockFut.join();

                if (shortTermLock != null) {
                    shortTermLocks.add(shortTermLock);
                }
            }

            return shortTermLocks;
        });
    }

    private CompletableFuture<?> takeRemoveLockOnIndexes(BinaryRow binaryRow, RowId rowId, UUID txId) {
        Collection<IndexLocker> indexes = indexesLockers.get().values();

        if (nullOrEmpty(indexes)) {
            return nullCompletedFuture();
        }

        CompletableFuture<?>[] locks = new CompletableFuture[indexes.size()];
        int idx = 0;

        for (IndexLocker locker : indexes) {
            locks[idx++] = locker.locksForRemove(txId, binaryRow, rowId);
        }

        return allOf(locks);
    }

    /**
     * Takes all required locks on a key, before deleting the value.
     *
     * @param txId Transaction id.
     * @return Future completes with {@link RowId} or {@code null} if there is no value for remove.
     */
    private CompletableFuture<RowId> takeLocksForDeleteExact(BinaryRow expectedRow, RowId rowId, BinaryRow actualRow, UUID txId) {
        return lockManager.acquire(txId, new LockKey(tableLockKey), LockMode.IX)
                .thenCompose(ignored -> lockManager.acquire(txId, new LockKey(tableLockKey, rowId), LockMode.S)) // S lock on RowId
                .thenCompose(ignored -> {
                    if (equalValues(actualRow, expectedRow)) {
                        return lockManager.acquire(txId, new LockKey(tableLockKey, rowId), LockMode.X) // X lock on RowId
                                .thenCompose(ignored0 -> takeRemoveLockOnIndexes(actualRow, rowId, txId))
                                .thenApply(exclusiveRowLock -> rowId);
                    }

                    return nullCompletedFuture();
                });
    }

    /**
     * Takes all required locks on a key, before deleting the value.
     *
     * @param txId Transaction id.
     * @return Future completes with {@link RowId} or {@code null} if there is no value for the key.
     */
    private CompletableFuture<RowId> takeLocksForDelete(BinaryRow binaryRow, RowId rowId, UUID txId) {
        return lockManager.acquire(txId, new LockKey(tableLockKey), LockMode.IX)
                .thenCompose(ignored -> lockManager.acquire(txId, new LockKey(tableLockKey, rowId), LockMode.X)) // X lock on RowId
                .thenCompose(ignored -> takeRemoveLockOnIndexes(binaryRow, rowId, txId))
                .thenApply(ignored -> rowId);
    }

    /**
     * Takes all required locks on a key, before getting the value.
     *
     * @param txId Transaction id.
     * @return Future completes with {@link RowId} or {@code null} if there is no value for the key.
     */
    private CompletableFuture<RowId> takeLocksForGet(RowId rowId, UUID txId) {
        return lockManager.acquire(txId, new LockKey(tableLockKey, rowId), LockMode.S) // S lock on RowId
                .thenApply(ignored -> rowId);
    }

    /**
     * Precesses two actions.
     *
     * @param request Two actions operation request.
     * @param leaseStartTime Lease start time.
     * @return Listener response.
     */
    private CompletableFuture<ReplicaResult> processTwoEntriesAction(ReadWriteSwapRowReplicaRequest request, long leaseStartTime) {
        BinaryRow newRow = request.newBinaryRow();
        BinaryRow expectedRow = request.oldBinaryRow();
        ReplicationGroupIdMessage commitPartitionId = request.commitPartitionId();

        assert commitPartitionId != null : "Commit partition is null [type=" + request.requestType() + ']';

        UUID txId = request.transactionId();

        if (request.requestType() == RW_REPLACE) {
            return resolveRowByPk(extractPk(newRow), txId, (rowId, row, lastCommitTime) -> {
                if (rowId == null) {
                    return completedFuture(new ReplicaResult(false, null));
                }

                return takeLocksForReplace(expectedRow, row, newRow, rowId, txId)
                        .thenCompose(rowIdLock -> {
                            if (rowIdLock == null) {
                                return completedFuture(new ReplicaResult(false, null));
                            }

                            return validateWriteAgainstSchemaAfterTakingLocks(txId)
                                    .thenCompose(catalogVersion -> awaitCleanup(rowIdLock.get1(), catalogVersion))
                                    .thenCompose(
                                            catalogVersion -> applyUpdateCommand(
                                                    commitPartitionId.asReplicationGroupId(),
                                                    rowIdLock.get1().uuid(),
                                                    newRow,
                                                    lastCommitTime,
                                                    txId,
                                                    request.full(),
                                                    request.coordinatorId(),
                                                    catalogVersion,
                                                    request.skipDelayedAck(),
                                                    leaseStartTime
                                            )
                                    )
                                    .thenApply(res -> new IgniteBiTuple<>(res, rowIdLock))
                                    .thenApply(tuple -> {
                                        // Release short term locks.
                                        tuple.get2().get2()
                                                .forEach(lock -> lockManager.release(lock.txId(), lock.lockKey(), lock.lockMode()));

                                        return new ReplicaResult(true, tuple.get1());
                                    });
                        });
            });
        }

        throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
                format("Unknown two actions operation [actionType={}]", request.requestType()));
    }

    /**
     * Takes all required locks on a key, before updating the value.
     *
     * @param txId Transaction id.
     * @return Future completes with tuple {@link RowId} and collection of {@link Lock} or {@code null} if there is no suitable row.
     */
    private CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>> takeLocksForReplace(BinaryRow expectedRow, @Nullable BinaryRow oldRow,
            BinaryRow newRow, RowId rowId, UUID txId) {
        return lockManager.acquire(txId, new LockKey(tableLockKey), LockMode.IX)
                .thenCompose(ignored -> lockManager.acquire(txId, new LockKey(tableLockKey, rowId), LockMode.S))
                .thenCompose(ignored -> {
                    if (oldRow != null && equalValues(oldRow, expectedRow)) {
                        return lockManager.acquire(txId, new LockKey(tableLockKey, rowId), LockMode.X) // X lock on RowId
                                .thenCompose(ignored1 -> takePutLockOnIndexes(newRow, rowId, txId))
                                .thenApply(shortTermLocks -> new IgniteBiTuple<>(rowId, shortTermLocks));
                    }

                    return nullCompletedFuture();
                });
    }

    /**
     * Resolves read result to the corresponding binary row. Following rules are used for read result resolution:
     * <ol>
     *     <li>If timestamp is null (RW request), assert that retrieved tx id matches proposed one or that retrieved tx id is null
     *     and return binary row. Currently it's only possible to retrieve write intents if they belong to the same transaction,
     *     locks prevent reading write intents created by others.</li>
     *     <li>If timestamp is not null (RO request), perform write intent resolution if given readResult is a write intent itself
     *     or return binary row otherwise.</li>
     * </ol>
     *
     * @param readResult Read result to resolve.
     * @param txId Nullable transaction id, should be provided if resolution is performed within the context of RW transaction.
     * @param timestamp Timestamp is used in RO transaction only.
     * @param lastCommitted Action to get the latest committed row.
     * @return Future to resolved binary row.
     */
    private CompletableFuture<@Nullable TimedBinaryRow> resolveReadResult(
            ReadResult readResult,
            @Nullable UUID txId,
            @Nullable HybridTimestamp timestamp,
            Supplier<@Nullable TimedBinaryRow> lastCommitted
    ) {
        if (readResult == null) {
            return nullCompletedFuture();
        } else if (!readResult.isWriteIntent()) {
            return completedFuture(new TimedBinaryRow(readResult.binaryRow(), readResult.commitTimestamp()));
        } else {
            // RW write intent resolution.
            if (timestamp == null) {
                UUID retrievedResultTxId = readResult.transactionId();

                if (txId.equals(retrievedResultTxId)) {
                    // Same transaction - return the retrieved value. It may be either a writeIntent or a regular value.
                    return completedFuture(new TimedBinaryRow(readResult.binaryRow()));
                }
            }

            return resolveWriteIntentAsync(readResult, timestamp, lastCommitted);
        }
    }

    /**
     * Resolves a read result to the matched row. If the result does not match any row, the method returns a future to {@code null}.
     *
     * @param readResult Read result.
     * @param timestamp Timestamp.
     * @param lastCommitted Action to get a last committed row.
     * @return Result future.
     */
    private CompletableFuture<@Nullable TimedBinaryRow> resolveWriteIntentAsync(
            ReadResult readResult,
            @Nullable HybridTimestamp timestamp,
            Supplier<@Nullable TimedBinaryRow> lastCommitted
    ) {
        return inBusyLockAsync(busyLock, () ->
                resolveWriteIntentReadability(readResult, timestamp)
                        .thenApply(writeIntentReadable ->
                                inBusyLock(busyLock, () -> {
                                            if (writeIntentReadable) {
                                                // Even though this readResult is still a write intent entry in the storage
                                                // (therefore it contains txId), we already know it relates to a committed transaction
                                                // and will be cleaned up by an asynchronous task
                                                // started in scheduleTransactionRowAsyncCleanup().
                                                // So it's safe to assume that that this is the latest committed entry.
                                                HybridTimestamp commitTimestamp =
                                                        txManager.stateMeta(readResult.transactionId()).commitTimestamp();

                                                return new TimedBinaryRow(readResult.binaryRow(), commitTimestamp);
                                            }

                                            return lastCommitted.get();
                                        }
                                )
                        )
        );
    }

    /**
     * Schedules an async write intent switch action for the given write intent.
     *
     * @param txId Transaction id.
     * @param rowId Id of a row that we want to clean up.
     * @param meta Resolved transaction state.
     */
    private void scheduleAsyncWriteIntentSwitch(UUID txId, RowId rowId, TransactionMeta meta) {
        TxState txState = meta.txState();

        assert isFinalState(txState) : "Unexpected state [txId=" + txId + ", txState=" + txState + ']';

        HybridTimestamp commitTimestamp = meta.commitTimestamp();

        // Add the resolved row to the set of write intents the transaction created.
        // If the volatile state was lost on restart, we'll have a single item in that set,
        // otherwise the set already contains this value.
        storageUpdateHandler.handleWriteIntentRead(txId, rowId);

        // If the volatile state was lost and we no longer know which rows were affected by this transaction,
        // it is possible that two concurrent RO transactions start resolving write intents for different rows
        // but created by the same transaction.

        // Both normal cleanup and single row cleanup are using txsPendingRowIds map to store write intents.
        // So we don't need a separate method to handle single row case.
        CompletableFuture<?> future = rowCleanupMap.computeIfAbsent(rowId, k -> {
            // The cleanup for this row has already been triggered. For example, we are resolving a write intent for an RW transaction
            // and a concurrent RO transaction resolves the same row, hence computeIfAbsent.

            // We don't need to take the partition snapshots read lock, see #INTERNAL_DOC_PLACEHOLDER why.
            return txManager.executeWriteIntentSwitchAsync(() -> inBusyLock(busyLock,
                    () -> storageUpdateHandler.switchWriteIntents(
                            txId,
                            txState == COMMITTED,
                            commitTimestamp,
                            indexIdsAtRwTxBeginTsOrNull(txId)
                    )
            )).whenComplete((unused, e) -> {
                if (e != null && !ReplicatorRecoverableExceptions.isRecoverable(e)) {
                    LOG.warn("Failed to complete transaction cleanup command [txId=" + txId + ']', e);
                }
            });
        });

        future.handle((v, e) -> rowCleanupMap.remove(rowId, future));
    }

    /**
     * Check whether we can read from the provided write intent.
     *
     * @param writeIntent Write intent to resolve.
     * @param timestamp Timestamp.
     * @return The future completes with {@code true} when the transaction is committed and commit time <= read time, {@code false}
     *         otherwise (whe the transaction is either in progress, or aborted, or committed and commit time > read time).
     */
    private CompletableFuture<Boolean> resolveWriteIntentReadability(ReadResult writeIntent, @Nullable HybridTimestamp timestamp) {
        UUID txId = writeIntent.transactionId();

        return transactionStateResolver.resolveTxState(
                        txId,
                        replicationGroupId(writeIntent.commitTableOrZoneId(), writeIntent.commitPartitionId()),
                        timestamp)
                .thenApply(transactionMeta -> {
                    if (isFinalState(transactionMeta.txState())) {
                        scheduleAsyncWriteIntentSwitch(txId, writeIntent.rowId(), transactionMeta);
                    }

                    return canReadFromWriteIntent(txId, transactionMeta, timestamp);
                });
    }

    private ReplicationGroupId replicationGroupId(int tableOrZoneId, int partitionId) {
        if (nodeProperties.colocationEnabled()) {
            return new ZonePartitionId(tableOrZoneId, partitionId);
        } else {
            return new TablePartitionId(tableOrZoneId, partitionId);
        }
    }

    /**
     * Check whether we can read write intents created by this transaction.
     *
     * @param txId Transaction id.
     * @param txMeta Transaction meta info.
     * @param timestamp Read timestamp.
     * @return {@code true} if we can read from entries created in this transaction (when the transaction was committed and commit time <=
     *         read time).
     */
    private static Boolean canReadFromWriteIntent(UUID txId, TransactionMeta txMeta, @Nullable HybridTimestamp timestamp) {
        assert isFinalState(txMeta.txState()) || txMeta.txState() == PENDING
                : format("Unexpected state defined by write intent resolution [txId={}, txMeta={}].", txId, txMeta);

        if (txMeta.txState() == COMMITTED) {
            boolean readLatest = timestamp == null;

            return readLatest || txMeta.commitTimestamp().compareTo(timestamp) <= 0;
        } else {
            // Either ABORTED or PENDING.
            return false;
        }
    }

    /**
     * Takes current timestamp and makes schema related validations at this timestamp.
     *
     * @param txId Transaction ID.
     * @return Future that will complete when validation completes.
     */
    private CompletableFuture<Void> validateRwReadAgainstSchemaAfterTakingLocks(UUID txId) {
        HybridTimestamp operationTimestamp = clockService.now();

        return schemaSyncService.waitForMetadataCompleteness(operationTimestamp)
                .thenRun(() -> failIfSchemaChangedSinceTxStart(txId, operationTimestamp));
    }

    /**
     * Takes current timestamp and makes schema related validations at this timestamp.
     *
     * @param txId Transaction ID.
     * @return Future that will complete with catalog version associated with given operation though the operation timestamp.
     */
    private CompletableFuture<Integer> validateWriteAgainstSchemaAfterTakingLocks(UUID txId) {
        HybridTimestamp operationTimestamp = clockService.current();

        return reliableCatalogVersionFor(operationTimestamp)
                .thenApply(catalogVersion -> {
                    failIfSchemaChangedSinceTxStart(txId, operationTimestamp);

                    return catalogVersion;
                });
    }

    private UpdateCommand updateCommand(
            ReplicationGroupId commitPartitionId,
            UUID rowUuid,
            @Nullable BinaryRow row,
            @Nullable HybridTimestamp lastCommitTimestamp,
            UUID txId,
            boolean full,
            UUID txCoordinatorId,
            @Nullable HybridTimestamp initiatorTime,
            int catalogVersion,
            @Nullable Long leaseStartTime
    ) {
        UpdateCommandV2Builder bldr = PARTITION_REPLICATION_MESSAGES_FACTORY.updateCommandV2()
                .tableId(tableId)
                .commitPartitionId(replicationGroupIdMessage(commitPartitionId))
                .rowUuid(rowUuid)
                .txId(txId)
                .full(full)
                .initiatorTime(initiatorTime)
                .txCoordinatorId(txCoordinatorId)
                .requiredCatalogVersion(catalogVersion)
                .leaseStartTime(leaseStartTime);

        if (lastCommitTimestamp != null || row != null) {
            TimedBinaryRowMessageBuilder rowMsgBldr = PARTITION_REPLICATION_MESSAGES_FACTORY.timedBinaryRowMessage();

            if (lastCommitTimestamp != null) {
                rowMsgBldr.timestamp(lastCommitTimestamp);
            }

            if (row != null) {
                rowMsgBldr.binaryRowMessage(binaryRowMessage(row));
            }

            bldr.messageRowToUpdate(rowMsgBldr.build());
        }

        return bldr.build();
    }

    private static BinaryRowMessage binaryRowMessage(BinaryRow row) {
        return PARTITION_REPLICATION_MESSAGES_FACTORY.binaryRowMessage()
                .binaryTuple(row.tupleSlice())
                .schemaVersion(row.schemaVersion())
                .build();
    }

    private UpdateAllCommand updateAllCommand(
            Map<UUID, TimedBinaryRowMessage> rowsToUpdate,
            ReplicationGroupIdMessage commitPartitionId,
            UUID transactionId,
            HybridTimestamp initiatorTime,
            boolean full,
            UUID txCoordinatorId,
            int catalogVersion,
            @Nullable Long leaseStartTime
    ) {
        return PARTITION_REPLICATION_MESSAGES_FACTORY.updateAllCommandV2()
                .tableId(tableId)
                .commitPartitionId(commitPartitionId)
                .messageRowsToUpdate(rowsToUpdate)
                .txId(transactionId)
                .initiatorTime(initiatorTime)
                .full(full)
                .txCoordinatorId(txCoordinatorId)
                .requiredCatalogVersion(catalogVersion)
                .leaseStartTime(leaseStartTime)
                .build();
    }

    private void failIfSchemaChangedSinceTxStart(UUID txId, HybridTimestamp operationTimestamp) {
        schemaCompatValidator.failIfSchemaChangedAfterTxStart(txId, operationTimestamp, tableId());
    }

    private CompletableFuture<Integer> reliableCatalogVersionFor(HybridTimestamp ts) {
        return reliableCatalogVersions.reliableCatalogVersionFor(ts);
    }

    /**
     * Method to convert from {@link TablePartitionId} object to command-based {@link TablePartitionIdMessage} object.
     *
     * @param tablePartId {@link TablePartitionId} object to convert to {@link TablePartitionIdMessage}.
     * @return {@link TablePartitionIdMessage} object converted from argument.
     */
    public static TablePartitionIdMessage tablePartitionId(TablePartitionId tablePartId) {
        return toTablePartitionIdMessage(REPLICA_MESSAGES_FACTORY, tablePartId);
    }

    private static ReplicationGroupIdMessage replicationGroupIdMessage(ReplicationGroupId groupId) {
        return toReplicationGroupIdMessage(REPLICA_MESSAGES_FACTORY, groupId);
    }

    /**
     * Class that stores a list of futures for operations that has happened in a specific transaction. Also, the class has a property
     * {@code state} that represents a transaction state.
     */
    private static class TxCleanupReadyFutureList {
        /**
         * Operation type is mapped operation futures.
         */
        final Map<RequestType, Map<OperationId, CompletableFuture<?>>> futures = new EnumMap<>(RequestType.class);
    }

    @Override
    public void onShutdown() {
        if (!stopGuard.compareAndSet(false, true)) {
            return;
        }

        busyLock.block();

        indexBuildingProcessor.onShutdown();
    }

    private int partId() {
        return replicationGroupId.partitionId();
    }

    private int tableId() {
        return tableId;
    }

    private CompletableFuture<?> processOperationRequestWithTxOperationManagementLogic(
            UUID senderId,
            ReplicaRequest request,
            ReplicaPrimacy replicaPrimacy,
            @Nullable HybridTimestamp opStartTsIfDirectRo
    ) {
        indexBuildingProcessor.incrementRwOperationCountIfNeeded(request);

        UUID txIdLockingLwm = tryToLockLwmIfNeeded(request, opStartTsIfDirectRo);

        try {
            return processOperationRequest(senderId, request, replicaPrimacy, opStartTsIfDirectRo)
                    .whenComplete((unused, throwable) -> {
                        unlockLwmIfNeeded(txIdLockingLwm, request);
                        indexBuildingProcessor.decrementRwOperationCountIfNeeded(request);
                    });
        } catch (Throwable e) {
            try {
                unlockLwmIfNeeded(txIdLockingLwm, request);
            } catch (Throwable unlockProblem) {
                e.addSuppressed(unlockProblem);
            }

            try {
                indexBuildingProcessor.decrementRwOperationCountIfNeeded(request);
            } catch (Throwable decrementProblem) {
                e.addSuppressed(decrementProblem);
            }
            throw e;
        }
    }

    /**
     * Generates a fake transaction ID that will only be used to identify one direct RO operation for purposes of locking and unlocking LWM.
     * It should not be used as a replacement for a real transaction ID in other contexts.
     */
    private static UUID newFakeTxId() {
        return UUID.randomUUID();
    }

    /**
     * For an operation of an RO transaction, attempts to lock LWM on current node (either if the operation is not direct, or if it's direct
     * and concerns more than one key), and does nothing for other types of requests.
     *
     * <p>If lock attempt fails, throws an exception with a specific error code ({@link Transactions#TX_STALE_READ_ONLY_OPERATION_ERR}).
     *
     * <p>For explicit RO transactions, the lock will be later released when cleaning up after the RO transaction had been finished.
     *
     * <p>For direct RO operations (which happen in implicit RO transactions), LWM will be unlocked right after the read had been done
     * (see {@link #unlockLwmIfNeeded(UUID, ReplicaRequest)}).
     *
     * <p>Also, for explicit RO transactions, an automatic unlock is registered on coordinator leave.
     *
     * @param request Request that is being handled.
     * @param opStartTsIfDirectRo Timestamp of operation start if the operation is a direct RO operation, {@code null} otherwise.
     * @return Transaction ID (real for explicit transaction, fake for direct RO operation) that shoiuld be used to lock LWM, or
     *         {@code null} if LWM doesn't need to be locked..
     */
    private @Nullable UUID tryToLockLwmIfNeeded(ReplicaRequest request, @Nullable HybridTimestamp opStartTsIfDirectRo) {
        UUID txIdToLockLwm;
        HybridTimestamp tsToLockLwm = null;

        if (request instanceof ReadOnlyDirectMultiRowReplicaRequest
                && ((ReadOnlyDirectMultiRowReplicaRequest) request).primaryKeys().size() > 1) {
            assert opStartTsIfDirectRo != null;

            txIdToLockLwm = newFakeTxId();
            tsToLockLwm = opStartTsIfDirectRo;
        } else if (request instanceof ReadOnlyReplicaRequest) {
            ReadOnlyReplicaRequest readOnlyRequest = (ReadOnlyReplicaRequest) request;
            txIdToLockLwm = readOnlyRequest.transactionId();
            tsToLockLwm = readOnlyRequest.readTimestamp();
        } else {
            txIdToLockLwm = null;
        }

        if (txIdToLockLwm != null) {
            if (!lowWatermark.tryLock(txIdToLockLwm, tsToLockLwm)) {
                throw new TransactionException(Transactions.TX_STALE_READ_ONLY_OPERATION_ERR, "Read timestamp is not available anymore.");
            }

            registerAutoLwmUnlockOnCoordinatorLeaveIfNeeded(request, txIdToLockLwm);
        }

        return txIdToLockLwm;
    }

    private void registerAutoLwmUnlockOnCoordinatorLeaveIfNeeded(ReplicaRequest request, UUID txIdToLockLwm) {
        if (request instanceof ReadOnlyReplicaRequest) {
            ReadOnlyReplicaRequest readOnlyReplicaRequest = (ReadOnlyReplicaRequest) request;

            UUID coordinatorId = readOnlyReplicaRequest.coordinatorId();
            // TODO: remove null check after IGNITE-24120 is sorted out.
            if (coordinatorId != null) {
                FullyQualifiedResourceId resourceId = new FullyQualifiedResourceId(txIdToLockLwm, txIdToLockLwm);
                remotelyTriggeredResourceRegistry.register(resourceId, coordinatorId, () -> () -> lowWatermark.unlock(txIdToLockLwm));
            }
        }
    }

    private void unlockLwmIfNeeded(@Nullable UUID txIdToUnlockLwm, ReplicaRequest request) {
        if (txIdToUnlockLwm != null && request instanceof ReadOnlyDirectReplicaRequest) {
            lowWatermark.unlock(txIdToUnlockLwm);
        }
    }

    private List<Integer> indexIdsAtRwTxBeginTs(UUID txId) {
        return TableUtils.indexIdsAtRwTxBeginTs(catalogService, txId, tableId());
    }

    private @Nullable List<Integer> indexIdsAtRwTxBeginTsOrNull(UUID txId) {
        return TableUtils.indexIdsAtRwTxBeginTsOrNull(catalogService, txId, tableId());
    }

    private int tableVersionByTs(HybridTimestamp ts) {
        Catalog catalog = catalogService.activeCatalog(ts.longValue());

        CatalogTableDescriptor table = catalog.table(tableId());

        assert table != null : "tableId=" + tableId() + ", catalogVersion=" + catalog.version();

        return table.tableVersion();
    }

    private static @Nullable BinaryRow binaryRow(@Nullable TimedBinaryRow timedBinaryRow) {
        return timedBinaryRow == null ? null : timedBinaryRow.binaryRow();
    }

    private @Nullable BinaryRow upgrade(@Nullable BinaryRow source, int targetSchemaVersion) {
        return source == null ? null : new BinaryRowUpgrader(schemaRegistry, targetSchemaVersion).upgrade(source);
    }

    @TestOnly
    public void cleanupLocally(UUID txId, boolean commit, @Nullable HybridTimestamp commitTimestamp) {
        storageUpdateHandler.switchWriteIntents(txId, commit, commitTimestamp, null);
    }

    /**
     * Operation unique identifier.
     */
    private static class OperationId {
        /** Operation node initiator id. */
        private UUID initiatorId;

        /** Timestamp. */
        private long ts;

        /**
         * The constructor.
         *
         * @param initiatorId Sender node id.
         * @param ts Timestamp.
         */
        public OperationId(UUID initiatorId, long ts) {
            this.initiatorId = initiatorId;
            this.ts = ts;
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || getClass() != o.getClass()) {
                return false;
            }

            OperationId that = (OperationId) o;

            if (ts != that.ts) {
                return false;
            }
            return initiatorId.equals(that.initiatorId);
        }

        @Override
        public int hashCode() {
            int result = initiatorId.hashCode();
            result = 31 * result + (int) (ts ^ (ts >>> 32));
            return result;
        }
    }
}
